PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void TwoPhaseTransactionGid (Oid subid, TransactionId xid, char *gid, int szgid)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void apply_worker_exit (void)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker ()
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 336 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 195 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 265 of file worker.c.

266 {
267  /* The action for non-streaming transactions. */
269 
270  /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:266
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:271
@ TRANS_PARALLEL_APPLY
Definition: worker.c:274
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:272
@ TRANS_LEADER_APPLY
Definition: worker.c:268
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:273

Function Documentation

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3272 of file worker.c.

3273 {
3275  LogicalRepMsgType saved_command;
3276 
3277  /*
3278  * Set the current command being applied. Since this function can be
3279  * called recursively when applying spooled changes, save the current
3280  * command.
3281  */
3282  saved_command = apply_error_callback_arg.command;
3284 
3285  switch (action)
3286  {
3287  case LOGICAL_REP_MSG_BEGIN:
3288  apply_handle_begin(s);
3289  break;
3290 
3293  break;
3294 
3297  break;
3298 
3301  break;
3302 
3305  break;
3306 
3309  break;
3310 
3313  break;
3314 
3315  case LOGICAL_REP_MSG_TYPE:
3316  apply_handle_type(s);
3317  break;
3318 
3321  break;
3322 
3324 
3325  /*
3326  * Logical replication does not use generic logical messages yet.
3327  * Although, it could be used by other applications that use this
3328  * output plugin.
3329  */
3330  break;
3331 
3334  break;
3335 
3338  break;
3339 
3342  break;
3343 
3346  break;
3347 
3350  break;
3351 
3354  break;
3355 
3358  break;
3359 
3362  break;
3363 
3366  break;
3367 
3368  default:
3369  ereport(ERROR,
3370  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3371  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3372  }
3373 
3374  /* Reset the current command */
3375  apply_error_callback_arg.command = saved_command;
3376 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1271
static void apply_handle_type(StringInfo s)
Definition: worker.c:2324
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3144
static void apply_handle_update(StringInfo s)
Definition: worker.c:2520
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2131
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1169
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:278
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2704
static void apply_handle_begin(StringInfo s)
Definition: worker.c:991
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1016
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1812
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2301
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1108
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1218
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1626
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1408
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1042
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1467
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2371
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:221

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4893 of file worker.c.

4894 {
4896 
4898  return;
4899 
4900  Assert(errarg->origin_name);
4901 
4902  if (errarg->rel == NULL)
4903  {
4904  if (!TransactionIdIsValid(errarg->remote_xid))
4905  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4906  errarg->origin_name,
4907  logicalrep_message_type(errarg->command));
4908  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4909  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4910  errarg->origin_name,
4912  errarg->remote_xid);
4913  else
4914  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4915  errarg->origin_name,
4917  errarg->remote_xid,
4918  LSN_FORMAT_ARGS(errarg->finish_lsn));
4919  }
4920  else
4921  {
4922  if (errarg->remote_attnum < 0)
4923  {
4924  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4925  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4926  errarg->origin_name,
4928  errarg->rel->remoterel.nspname,
4929  errarg->rel->remoterel.relname,
4930  errarg->remote_xid);
4931  else
4932  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4933  errarg->origin_name,
4935  errarg->rel->remoterel.nspname,
4936  errarg->rel->remoterel.relname,
4937  errarg->remote_xid,
4938  LSN_FORMAT_ARGS(errarg->finish_lsn));
4939  }
4940  else
4941  {
4942  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4943  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4944  errarg->origin_name,
4946  errarg->rel->remoterel.nspname,
4947  errarg->rel->remoterel.relname,
4948  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4949  errarg->remote_xid);
4950  else
4951  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4952  errarg->origin_name,
4954  errarg->rel->remoterel.nspname,
4955  errarg->rel->remoterel.relname,
4956  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4957  errarg->remote_xid,
4958  LSN_FORMAT_ARGS(errarg->finish_lsn));
4959  }
4960  }
4961 }
#define Assert(condition)
Definition: c.h:858
#define errcontext
Definition: elog.h:196
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:226
XLogRecPtr finish_lsn
Definition: worker.c:227
LogicalRepRelMapEntry * rel
Definition: worker.c:222
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert, LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 991 of file worker.c.

992 {
993  LogicalRepBeginData begin_data;
994 
995  /* There must not be an active streaming transaction. */
997 
998  logicalrep_read_begin(s, &begin_data);
999  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1000 
1001  remote_final_lsn = begin_data.final_lsn;
1002 
1004 
1005  in_remote_transaction = true;
1006 
1008 }
bool in_remote_transaction
Definition: worker.c:303
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4965
static XLogRecPtr remote_final_lsn
Definition: worker.c:304
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4763
static TransactionId stream_xid
Definition: worker.c:309
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert, LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1042 of file worker.c.

1043 {
1044  LogicalRepPreparedTxnData begin_data;
1045 
1046  /* Tablesync should never receive prepare. */
1047  if (am_tablesync_worker())
1048  ereport(ERROR,
1049  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1050  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1051 
1052  /* There must not be an active streaming transaction. */
1054 
1055  logicalrep_read_begin_prepare(s, &begin_data);
1056  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1057 
1058  remote_final_lsn = begin_data.prepare_lsn;
1059 
1061 
1062  in_remote_transaction = true;
1063 
1065 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert, ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1016 of file worker.c.

1017 {
1018  LogicalRepCommitData commit_data;
1019 
1020  logicalrep_read_commit(s, &commit_data);
1021 
1022  if (commit_data.commit_lsn != remote_final_lsn)
1023  ereport(ERROR,
1024  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1025  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1026  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1028 
1029  apply_handle_commit_internal(&commit_data);
1030 
1031  /* Process any tables that are being synchronized in parallel. */
1032  process_syncing_tables(commit_data.end_lsn);
1033 
1036 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2241
static void reset_apply_error_context_info(void)
Definition: worker.c:4973
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2241 of file worker.c.

2242 {
2243  if (is_skipping_changes())
2244  {
2246 
2247  /*
2248  * Start a new transaction to clear the subskiplsn, if not started
2249  * yet.
2250  */
2251  if (!IsTransactionState())
2253  }
2254 
2255  if (IsTransactionState())
2256  {
2257  /*
2258  * The transaction is either non-empty or skipped, so we clear the
2259  * subskiplsn.
2260  */
2262 
2263  /*
2264  * Update origin state so we can restart streaming from correct
2265  * position in case of crash.
2266  */
2267  replorigin_session_origin_lsn = commit_data->end_lsn;
2269 
2271 
2272  if (IsTransactionBlock())
2273  {
2274  EndTransactionBlock(false);
2276  }
2277 
2278  pgstat_report_stat(false);
2279 
2281  }
2282  else
2283  {
2284  /* Process any invalidation messages that might have accumulated. */
2287  }
2288 
2289  in_remote_transaction = false;
2290 }
static void stop_skipping_changes(void)
Definition: worker.c:4790
#define is_skipping_changes()
Definition: worker.c:336
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4812
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3436
void maybe_reread_subscription(void)
Definition: worker.c:3862
void AcceptInvalidationMessages(void)
Definition: inval.c:806
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
long pgstat_report_stat(bool force)
Definition: pgstat.c:579
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:384
void StartTransactionCommand(void)
Definition: xact.c:2995
bool IsTransactionBlock(void)
Definition: xact.c:4915
void CommitTransactionCommand(void)
Definition: xact.c:3093
bool EndTransactionBlock(bool chain)
Definition: xact.c:3993
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:255

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1169 of file worker.c.

1170 {
1171  LogicalRepCommitPreparedTxnData prepare_data;
1172  char gid[GIDSIZE];
1173 
1174  logicalrep_read_commit_prepared(s, &prepare_data);
1175  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1176 
1177  /* Compute GID for two_phase transactions. */
1179  gid, sizeof(gid));
1180 
1181  /* There is no transaction when COMMIT PREPARED is called */
1183 
1184  /*
1185  * Update origin state so we can restart streaming from correct position
1186  * in case of crash.
1187  */
1188  replorigin_session_origin_lsn = prepare_data.end_lsn;
1190 
1191  FinishPreparedTransaction(gid, true);
1194  pgstat_report_stat(false);
1195 
1197  in_remote_transaction = false;
1198 
1199  /* Process any tables that are being synchronized in parallel. */
1200  process_syncing_tables(prepare_data.end_lsn);
1201 
1202  clear_subscription_skip_lsn(prepare_data.end_lsn);
1203 
1206 }
static void begin_replication_step(void)
Definition: worker.c:508
static void end_replication_step(void)
Definition: worker.c:531
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4405
Subscription * MySubscription
Definition: worker.c:298
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1503
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2704 of file worker.c.

2705 {
2706  LogicalRepRelMapEntry *rel;
2707  LogicalRepTupleData oldtup;
2708  LogicalRepRelId relid;
2709  UserContext ucxt;
2710  ApplyExecutionData *edata;
2711  EState *estate;
2712  TupleTableSlot *remoteslot;
2713  MemoryContext oldctx;
2714  bool run_as_owner;
2715 
2716  /*
2717  * Quick return if we are skipping data modification changes or handling
2718  * streamed transactions.
2719  */
2720  if (is_skipping_changes() ||
2722  return;
2723 
2725 
2726  relid = logicalrep_read_delete(s, &oldtup);
2727  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2728  if (!should_apply_changes_for_rel(rel))
2729  {
2730  /*
2731  * The relation can't become interesting in the middle of the
2732  * transaction so it's safe to unlock it.
2733  */
2736  return;
2737  }
2738 
2739  /* Set relation for error callback */
2741 
2742  /* Check if we can do the delete. */
2744 
2745  /*
2746  * Make sure that any user-supplied code runs as the table owner, unless
2747  * the user has opted out of that behavior.
2748  */
2749  run_as_owner = MySubscription->runasowner;
2750  if (!run_as_owner)
2751  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2752 
2753  /* Initialize the executor state. */
2754  edata = create_edata_for_relation(rel);
2755  estate = edata->estate;
2756  remoteslot = ExecInitExtraTupleSlot(estate,
2757  RelationGetDescr(rel->localrel),
2758  &TTSOpsVirtual);
2759 
2760  /* Build the search tuple. */
2762  slot_store_data(remoteslot, rel, &oldtup);
2763  MemoryContextSwitchTo(oldctx);
2764 
2765  /* For a partitioned table, apply delete to correct partition. */
2766  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2768  remoteslot, NULL, CMD_DELETE);
2769  else
2771  remoteslot, rel->localindexoid);
2772 
2773  finish_edata(edata);
2774 
2775  /* Reset relation for error callback */
2777 
2778  if (!run_as_owner)
2779  RestoreUserContext(&ucxt);
2780 
2782 
2784 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2479
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:652
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:468
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:559
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2893
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:797
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:709
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2792
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:555
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:268
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
Definition: rel.h:531
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:473
ResultRelInfo * targetRelInfo
Definition: worker.c:211
EState * estate
Definition: worker.c:208
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 2792 of file worker.c.

2796 {
2797  EState *estate = edata->estate;
2798  Relation localrel = relinfo->ri_RelationDesc;
2799  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2800  EPQState epqstate;
2801  TupleTableSlot *localslot;
2802  bool found;
2803 
2804  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2805  ExecOpenIndices(relinfo, false);
2806 
2807  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2808  remoteslot, &localslot);
2809 
2810  /* If found delete it. */
2811  if (found)
2812  {
2813  EvalPlanQualSetSlot(&epqstate, localslot);
2814 
2815  /* Do the actual delete. */
2817  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2818  }
2819  else
2820  {
2821  /*
2822  * The tuple to be deleted could not be found. Do nothing except for
2823  * emitting a log message.
2824  *
2825  * XXX should this be promoted to ereport(LOG) perhaps?
2826  */
2827  elog(DEBUG1,
2828  "logical replication did not find row to be deleted "
2829  "in replication target relation \"%s\"",
2830  RelationGetRelationName(localrel));
2831  }
2832 
2833  /* Cleanup. */
2834  ExecCloseIndices(relinfo);
2835  EvalPlanQualEnd(&epqstate);
2836 }
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2846
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2339
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2539
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2982
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:244
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define RelationGetRelationName(relation)
Definition: rel.h:539
LogicalRepRelMapEntry * targetRel
Definition: worker.c:210
Relation ri_RelationDesc
Definition: execnodes.h:456

References ACL_DELETE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2371 of file worker.c.

2372 {
2373  LogicalRepRelMapEntry *rel;
2374  LogicalRepTupleData newtup;
2375  LogicalRepRelId relid;
2376  UserContext ucxt;
2377  ApplyExecutionData *edata;
2378  EState *estate;
2379  TupleTableSlot *remoteslot;
2380  MemoryContext oldctx;
2381  bool run_as_owner;
2382 
2383  /*
2384  * Quick return if we are skipping data modification changes or handling
2385  * streamed transactions.
2386  */
2387  if (is_skipping_changes() ||
2389  return;
2390 
2392 
2393  relid = logicalrep_read_insert(s, &newtup);
2394  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2395  if (!should_apply_changes_for_rel(rel))
2396  {
2397  /*
2398  * The relation can't become interesting in the middle of the
2399  * transaction so it's safe to unlock it.
2400  */
2403  return;
2404  }
2405 
2406  /*
2407  * Make sure that any user-supplied code runs as the table owner, unless
2408  * the user has opted out of that behavior.
2409  */
2410  run_as_owner = MySubscription->runasowner;
2411  if (!run_as_owner)
2412  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2413 
2414  /* Set relation for error callback */
2416 
2417  /* Initialize the executor state. */
2418  edata = create_edata_for_relation(rel);
2419  estate = edata->estate;
2420  remoteslot = ExecInitExtraTupleSlot(estate,
2421  RelationGetDescr(rel->localrel),
2422  &TTSOpsVirtual);
2423 
2424  /* Process and store remote tuple in the slot */
2426  slot_store_data(remoteslot, rel, &newtup);
2427  slot_fill_defaults(rel, estate, remoteslot);
2428  MemoryContextSwitchTo(oldctx);
2429 
2430  /* For a partitioned table, insert the tuple into a partition. */
2431  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2433  remoteslot, NULL, CMD_INSERT);
2434  else
2436  remoteslot);
2437 
2438  finish_edata(edata);
2439 
2440  /* Reset relation for error callback */
2442 
2443  if (!run_as_owner)
2444  RestoreUserContext(&ucxt);
2445 
2447 
2449 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2457
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:740
@ CMD_INSERT
Definition: nodes.h:267
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2457 of file worker.c.

2460 {
2461  EState *estate = edata->estate;
2462 
2463  /* We must open indexes here. */
2464  ExecOpenIndices(relinfo, false);
2465 
2466  /* Do the insert. */
2468  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2469 
2470  /* Cleanup. */
2471  ExecCloseIndices(relinfo);
2472 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1408 of file worker.c.

1409 {
1410  /*
1411  * ORIGIN message can only come inside streaming transaction or inside
1412  * remote transaction and before any actual writes.
1413  */
1414  if (!in_streamed_transaction &&
1417  ereport(ERROR,
1418  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1419  errmsg_internal("ORIGIN message sent out of order")));
1420 }
static bool in_streamed_transaction
Definition: worker.c:307

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1108 of file worker.c.

1109 {
1110  LogicalRepPreparedTxnData prepare_data;
1111 
1112  logicalrep_read_prepare(s, &prepare_data);
1113 
1114  if (prepare_data.prepare_lsn != remote_final_lsn)
1115  ereport(ERROR,
1116  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1117  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1118  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1120 
1121  /*
1122  * Unlike commit, here, we always prepare the transaction even though no
1123  * change has happened in this transaction or all changes are skipped. It
1124  * is done this way because at commit prepared time, we won't know whether
1125  * we have skipped preparing a transaction because of those reasons.
1126  *
1127  * XXX, We can optimize such that at commit prepared time, we first check
1128  * whether we have prepared the transaction or not but that doesn't seem
1129  * worthwhile because such cases shouldn't be common.
1130  */
1132 
1133  apply_handle_prepare_internal(&prepare_data);
1134 
1137  pgstat_report_stat(false);
1138 
1140 
1141  in_remote_transaction = false;
1142 
1143  /* Process any tables that are being synchronized in parallel. */
1144  process_syncing_tables(prepare_data.end_lsn);
1145 
1146  /*
1147  * Since we have already prepared the transaction, in a case where the
1148  * server crashes before clearing the subskiplsn, it will be left but the
1149  * transaction won't be resent. But that's okay because it's a rare case
1150  * and the subskiplsn will be cleared when finishing the next transaction.
1151  */
1154 
1157 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1071
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1071 of file worker.c.

1072 {
1073  char gid[GIDSIZE];
1074 
1075  /*
1076  * Compute unique GID for two_phase transactions. We don't use GID of
1077  * prepared transaction sent by server as that can lead to deadlock when
1078  * we have multiple subscriptions from same node point to publications on
1079  * the same node. See comments atop worker.c
1080  */
1081  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1082  gid, sizeof(gid));
1083 
1084  /*
1085  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1086  * called within the PrepareTransactionBlock below.
1087  */
1088  if (!IsTransactionBlock())
1089  {
1091  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1092  }
1093 
1094  /*
1095  * Update origin state so we can restart streaming from correct position
1096  * in case of crash.
1097  */
1098  replorigin_session_origin_lsn = prepare_data->end_lsn;
1100 
1102 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3941
void BeginTransactionBlock(void)
Definition: xact.c:3873

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2301 of file worker.c.

2302 {
2303  LogicalRepRelation *rel;
2304 
2306  return;
2307 
2308  rel = logicalrep_read_rel(s);
2310 
2311  /* Also reset all entries in the partition map that refer to remoterel. */
2313 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:540
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1218 of file worker.c.

1219 {
1220  LogicalRepRollbackPreparedTxnData rollback_data;
1221  char gid[GIDSIZE];
1222 
1223  logicalrep_read_rollback_prepared(s, &rollback_data);
1224  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1225 
1226  /* Compute GID for two_phase transactions. */
1227  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1228  gid, sizeof(gid));
1229 
1230  /*
1231  * It is possible that we haven't received prepare because it occurred
1232  * before walsender reached a consistent point or the two_phase was still
1233  * not enabled by that time, so in such cases, we need to skip rollback
1234  * prepared.
1235  */
1236  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1237  rollback_data.prepare_time))
1238  {
1239  /*
1240  * Update origin state so we can restart streaming from correct
1241  * position in case of crash.
1242  */
1245 
1246  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1248  FinishPreparedTransaction(gid, false);
1251 
1253  }
1254 
1255  pgstat_report_stat(false);
1256 
1258  in_remote_transaction = false;
1259 
1260  /* Process any tables that are being synchronized in parallel. */
1262 
1265 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2634

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1812 of file worker.c.

1813 {
1814  TransactionId xid;
1815  TransactionId subxid;
1816  LogicalRepStreamAbortData abort_data;
1817  ParallelApplyWorkerInfo *winfo;
1818  TransApplyAction apply_action;
1819 
1820  /* Save the message before it is consumed. */
1821  StringInfoData original_msg = *s;
1822  bool toplevel_xact;
1823 
1825  ereport(ERROR,
1826  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1827  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1828 
1829  /* We receive abort information only when we can apply in parallel. */
1830  logicalrep_read_stream_abort(s, &abort_data,
1832 
1833  xid = abort_data.xid;
1834  subxid = abort_data.subxid;
1835  toplevel_xact = (xid == subxid);
1836 
1837  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1838 
1839  apply_action = get_transaction_apply_action(xid, &winfo);
1840 
1841  switch (apply_action)
1842  {
1843  case TRANS_LEADER_APPLY:
1844 
1845  /*
1846  * We are in the leader apply worker and the transaction has been
1847  * serialized to file.
1848  */
1849  stream_abort_internal(xid, subxid);
1850 
1851  elog(DEBUG1, "finished processing the STREAM ABORT command");
1852  break;
1853 
1855  Assert(winfo);
1856 
1857  /*
1858  * For the case of aborting the subtransaction, we increment the
1859  * number of streaming blocks and take the lock again before
1860  * sending the STREAM_ABORT to ensure that the parallel apply
1861  * worker will wait on the lock for the next set of changes after
1862  * processing the STREAM_ABORT message if it is not already
1863  * waiting for STREAM_STOP message.
1864  *
1865  * It is important to perform this locking before sending the
1866  * STREAM_ABORT message so that the leader can hold the lock first
1867  * and the parallel apply worker will wait for the leader to
1868  * release the lock. This is the same as what we do in
1869  * apply_handle_stream_stop. See Locking Considerations atop
1870  * applyparallelworker.c.
1871  */
1872  if (!toplevel_xact)
1873  {
1877  }
1878 
1879  if (pa_send_data(winfo, s->len, s->data))
1880  {
1881  /*
1882  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1883  * wait here for the parallel apply worker to finish as that
1884  * is not required to maintain the commit order and won't have
1885  * the risk of failures due to transaction dependencies and
1886  * deadlocks. However, it is possible that before the parallel
1887  * worker finishes and we clear the worker info, the xid
1888  * wraparound happens on the upstream and a new transaction
1889  * with the same xid can appear and that can lead to duplicate
1890  * entries in ParallelApplyTxnHash. Yet another problem could
1891  * be that we may have serialized the changes in partial
1892  * serialize mode and the file containing xact changes may
1893  * already exist, and after xid wraparound trying to create
1894  * the file for the same xid can lead to an error. To avoid
1895  * these problems, we decide to wait for the aborts to finish.
1896  *
1897  * Note, it is okay to not update the flush location position
1898  * for aborts as in worst case that means such a transaction
1899  * won't be sent again after restart.
1900  */
1901  if (toplevel_xact)
1903 
1904  break;
1905  }
1906 
1907  /*
1908  * Switch to serialize mode when we are not able to send the
1909  * change to parallel apply worker.
1910  */
1911  pa_switch_to_partial_serialize(winfo, true);
1912 
1913  /* fall through */
1915  Assert(winfo);
1916 
1917  /*
1918  * Parallel apply worker might have applied some changes, so write
1919  * the STREAM_ABORT message so that it can rollback the
1920  * subtransaction if needed.
1921  */
1923  &original_msg);
1924 
1925  if (toplevel_xact)
1926  {
1929  }
1930  break;
1931 
1932  case TRANS_PARALLEL_APPLY:
1933 
1934  /*
1935  * If the parallel apply worker is applying spooled messages then
1936  * close the file before aborting.
1937  */
1938  if (toplevel_xact && stream_fd)
1940 
1941  pa_stream_abort(&abort_data);
1942 
1943  /*
1944  * We need to wait after processing rollback to savepoint for the
1945  * next set of changes.
1946  *
1947  * We have a race condition here due to which we can start waiting
1948  * here when there are more chunk of streams in the queue. See
1949  * apply_handle_stream_stop.
1950  */
1951  if (!toplevel_xact)
1953 
1954  elog(DEBUG1, "finished processing the STREAM ABORT command");
1955  break;
1956 
1957  default:
1958  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1959  break;
1960  }
1961 
1963 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:419
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5050
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4322
static BufFile * stream_fd
Definition: worker.c:339
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1729
static void stream_close_file(void)
Definition: worker.c:4274
uint32 TransactionId
Definition: c.h:652
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2131 of file worker.c.

2132 {
2133  TransactionId xid;
2134  LogicalRepCommitData commit_data;
2135  ParallelApplyWorkerInfo *winfo;
2136  TransApplyAction apply_action;
2137 
2138  /* Save the message before it is consumed. */
2139  StringInfoData original_msg = *s;
2140 
2142  ereport(ERROR,
2143  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2144  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2145 
2146  xid = logicalrep_read_stream_commit(s, &commit_data);
2147  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2148 
2149  apply_action = get_transaction_apply_action(xid, &winfo);
2150 
2151  switch (apply_action)
2152  {
2153  case TRANS_LEADER_APPLY:
2154 
2155  /*
2156  * The transaction has been serialized to file, so replay all the
2157  * spooled operations.
2158  */
2160  commit_data.commit_lsn);
2161 
2162  apply_handle_commit_internal(&commit_data);
2163 
2164  /* Unlink the files with serialized changes and subxact info. */
2166 
2167  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2168  break;
2169 
2171  Assert(winfo);
2172 
2173  if (pa_send_data(winfo, s->len, s->data))
2174  {
2175  /* Finish processing the streaming transaction. */
2176  pa_xact_finish(winfo, commit_data.end_lsn);
2177  break;
2178  }
2179 
2180  /*
2181  * Switch to serialize mode when we are not able to send the
2182  * change to parallel apply worker.
2183  */
2184  pa_switch_to_partial_serialize(winfo, true);
2185 
2186  /* fall through */
2188  Assert(winfo);
2189 
2191  &original_msg);
2192 
2194 
2195  /* Finish processing the streaming transaction. */
2196  pa_xact_finish(winfo, commit_data.end_lsn);
2197  break;
2198 
2199  case TRANS_PARALLEL_APPLY:
2200 
2201  /*
2202  * If the parallel apply worker is applying spooled messages then
2203  * close the file before committing.
2204  */
2205  if (stream_fd)
2207 
2208  apply_handle_commit_internal(&commit_data);
2209 
2211 
2212  /*
2213  * It is important to set the transaction state as finished before
2214  * releasing the lock. See pa_wait_for_xact_finish.
2215  */
2218 
2220 
2221  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2222  break;
2223 
2224  default:
2225  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2226  break;
2227  }
2228 
2229  /* Process any tables that are being synchronized in parallel. */
2230  process_syncing_tables(commit_data.end_lsn);
2231 
2233 
2235 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4205
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2001
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert, LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1271 of file worker.c.

1272 {
1273  LogicalRepPreparedTxnData prepare_data;
1274  ParallelApplyWorkerInfo *winfo;
1275  TransApplyAction apply_action;
1276 
1277  /* Save the message before it is consumed. */
1278  StringInfoData original_msg = *s;
1279 
1281  ereport(ERROR,
1282  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1283  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1284 
1285  /* Tablesync should never receive prepare. */
1286  if (am_tablesync_worker())
1287  ereport(ERROR,
1288  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1289  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1290 
1291  logicalrep_read_stream_prepare(s, &prepare_data);
1292  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1293 
1294  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1295 
1296  switch (apply_action)
1297  {
1298  case TRANS_LEADER_APPLY:
1299 
1300  /*
1301  * The transaction has been serialized to file, so replay all the
1302  * spooled operations.
1303  */
1305  prepare_data.xid, prepare_data.prepare_lsn);
1306 
1307  /* Mark the transaction as prepared. */
1308  apply_handle_prepare_internal(&prepare_data);
1309 
1311 
1313 
1314  in_remote_transaction = false;
1315 
1316  /* Unlink the files with serialized changes and subxact info. */
1318 
1319  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1320  break;
1321 
1323  Assert(winfo);
1324 
1325  if (pa_send_data(winfo, s->len, s->data))
1326  {
1327  /* Finish processing the streaming transaction. */
1328  pa_xact_finish(winfo, prepare_data.end_lsn);
1329  break;
1330  }
1331 
1332  /*
1333  * Switch to serialize mode when we are not able to send the
1334  * change to parallel apply worker.
1335  */
1336  pa_switch_to_partial_serialize(winfo, true);
1337 
1338  /* fall through */
1340  Assert(winfo);
1341 
1342  stream_open_and_write_change(prepare_data.xid,
1344  &original_msg);
1345 
1347 
1348  /* Finish processing the streaming transaction. */
1349  pa_xact_finish(winfo, prepare_data.end_lsn);
1350  break;
1351 
1352  case TRANS_PARALLEL_APPLY:
1353 
1354  /*
1355  * If the parallel apply worker is applying spooled messages then
1356  * close the file before preparing.
1357  */
1358  if (stream_fd)
1360 
1362 
1363  /* Mark the transaction as prepared. */
1364  apply_handle_prepare_internal(&prepare_data);
1365 
1367 
1369 
1371 
1374 
1376 
1377  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1378  break;
1379 
1380  default:
1381  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1382  break;
1383  }
1384 
1385  pgstat_report_stat(false);
1386 
1387  /* Process any tables that are being synchronized in parallel. */
1388  process_syncing_tables(prepare_data.end_lsn);
1389 
1390  /*
1391  * Similar to prepare case, the subskiplsn could be left in a case of
1392  * server crash but it's okay. See the comments in apply_handle_prepare().
1393  */
1396 
1398 
1400 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert, begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, XactLastCommitEnd, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1467 of file worker.c.

1468 {
1469  bool first_segment;
1470  ParallelApplyWorkerInfo *winfo;
1471  TransApplyAction apply_action;
1472 
1473  /* Save the message before it is consumed. */
1474  StringInfoData original_msg = *s;
1475 
1477  ereport(ERROR,
1478  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1479  errmsg_internal("duplicate STREAM START message")));
1480 
1481  /* There must not be an active streaming transaction. */
1483 
1484  /* notify handle methods we're processing a remote transaction */
1485  in_streamed_transaction = true;
1486 
1487  /* extract XID of the top-level transaction */
1488  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1489 
1491  ereport(ERROR,
1492  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1493  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1494 
1496 
1497  /* Try to allocate a worker for the streaming transaction. */
1498  if (first_segment)
1500 
1501  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1502 
1503  switch (apply_action)
1504  {
1506 
1507  /*
1508  * Function stream_start_internal starts a transaction. This
1509  * transaction will be committed on the stream stop unless it is a
1510  * tablesync worker in which case it will be committed after
1511  * processing all the messages. We need this transaction for
1512  * handling the BufFile, used for serializing the streaming data
1513  * and subxact info.
1514  */
1515  stream_start_internal(stream_xid, first_segment);
1516  break;
1517 
1519  Assert(winfo);
1520 
1521  /*
1522  * Once we start serializing the changes, the parallel apply
1523  * worker will wait for the leader to release the stream lock
1524  * until the end of the transaction. So, we don't need to release
1525  * the lock or increment the stream count in that case.
1526  */
1527  if (pa_send_data(winfo, s->len, s->data))
1528  {
1529  /*
1530  * Unlock the shared object lock so that the parallel apply
1531  * worker can continue to receive changes.
1532  */
1533  if (!first_segment)
1535 
1536  /*
1537  * Increment the number of streaming blocks waiting to be
1538  * processed by parallel apply worker.
1539  */
1541 
1542  /* Cache the parallel apply worker for this transaction. */
1544  break;
1545  }
1546 
1547  /*
1548  * Switch to serialize mode when we are not able to send the
1549  * change to parallel apply worker.
1550  */
1551  pa_switch_to_partial_serialize(winfo, !first_segment);
1552 
1553  /* fall through */
1555  Assert(winfo);
1556 
1557  /*
1558  * Open the spool file unless it was already opened when switching
1559  * to serialize mode. The transaction started in
1560  * stream_start_internal will be committed on the stream stop.
1561  */
1562  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1563  stream_start_internal(stream_xid, first_segment);
1564 
1566 
1567  /* Cache the parallel apply worker for this transaction. */
1569  break;
1570 
1571  case TRANS_PARALLEL_APPLY:
1572  if (first_segment)
1573  {
1574  /* Hold the lock until the end of the transaction. */
1577 
1578  /*
1579  * Signal the leader apply worker, as it may be waiting for
1580  * us.
1581  */
1583  }
1584 
1586  break;
1587 
1588  default:
1589  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1590  break;
1591  }
1592 
1594 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:315
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4292
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1429
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:676
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1626 of file worker.c.

1627 {
1628  ParallelApplyWorkerInfo *winfo;
1629  TransApplyAction apply_action;
1630 
1632  ereport(ERROR,
1633  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1634  errmsg_internal("STREAM STOP message without STREAM START")));
1635 
1636  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1637 
1638  switch (apply_action)
1639  {
1642  break;
1643 
1645  Assert(winfo);
1646 
1647  /*
1648  * Lock before sending the STREAM_STOP message so that the leader
1649  * can hold the lock first and the parallel apply worker will wait
1650  * for leader to release the lock. See Locking Considerations atop
1651  * applyparallelworker.c.
1652  */
1654 
1655  if (pa_send_data(winfo, s->len, s->data))
1656  {
1658  break;
1659  }
1660 
1661  /*
1662  * Switch to serialize mode when we are not able to send the
1663  * change to parallel apply worker.
1664  */
1665  pa_switch_to_partial_serialize(winfo, true);
1666 
1667  /* fall through */
1672  break;
1673 
1674  case TRANS_PARALLEL_APPLY:
1675  elog(DEBUG1, "applied %u changes in the streaming chunk",
1677 
1678  /*
1679  * By the time parallel apply worker is processing the changes in
1680  * the current streaming block, the leader apply worker may have
1681  * sent multiple streaming blocks. This can lead to parallel apply
1682  * worker start waiting even when there are more chunk of streams
1683  * in the queue. So, try to lock only if there is no message left
1684  * in the queue. See Locking Considerations atop
1685  * applyparallelworker.c.
1686  *
1687  * Note that here we have a race condition where we can start
1688  * waiting even when there are pending streaming chunks. This can
1689  * happen if the leader sends another streaming block and acquires
1690  * the stream lock again after the parallel apply worker checks
1691  * that there is no pending streaming block and before it actually
1692  * starts waiting on a lock. We can handle this case by not
1693  * allowing the leader to increment the stream block count during
1694  * the time parallel apply worker acquires the lock but it is not
1695  * clear whether that is worth the complexity.
1696  *
1697  * Now, if this missed chunk contains rollback to savepoint, then
1698  * there is a risk of deadlock which probably shouldn't happen
1699  * after restart.
1700  */
1702  break;
1703 
1704  default:
1705  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1706  break;
1707  }
1708 
1709  in_streamed_transaction = false;
1711 
1712  /*
1713  * The parallel apply worker could be in a transaction in which case we
1714  * need to report the state as STATE_IDLEINTRANSACTION.
1715  */
1718  else
1720 
1722 }
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1603
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933

References AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3144 of file worker.c.

3145 {
3146  bool cascade = false;
3147  bool restart_seqs = false;
3148  List *remote_relids = NIL;
3149  List *remote_rels = NIL;
3150  List *rels = NIL;
3151  List *part_rels = NIL;
3152  List *relids = NIL;
3153  List *relids_logged = NIL;
3154  ListCell *lc;
3155  LOCKMODE lockmode = AccessExclusiveLock;
3156 
3157  /*
3158  * Quick return if we are skipping data modification changes or handling
3159  * streamed transactions.
3160  */
3161  if (is_skipping_changes() ||
3163  return;
3164 
3166 
3167  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3168 
3169  foreach(lc, remote_relids)
3170  {
3171  LogicalRepRelId relid = lfirst_oid(lc);
3172  LogicalRepRelMapEntry *rel;
3173 
3174  rel = logicalrep_rel_open(relid, lockmode);
3175  if (!should_apply_changes_for_rel(rel))
3176  {
3177  /*
3178  * The relation can't become interesting in the middle of the
3179  * transaction so it's safe to unlock it.
3180  */
3181  logicalrep_rel_close(rel, lockmode);
3182  continue;
3183  }
3184 
3185  remote_rels = lappend(remote_rels, rel);
3187  rels = lappend(rels, rel->localrel);
3188  relids = lappend_oid(relids, rel->localreloid);
3190  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3191 
3192  /*
3193  * Truncate partitions if we got a message to truncate a partitioned
3194  * table.
3195  */
3196  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3197  {
3198  ListCell *child;
3199  List *children = find_all_inheritors(rel->localreloid,
3200  lockmode,
3201  NULL);
3202 
3203  foreach(child, children)
3204  {
3205  Oid childrelid = lfirst_oid(child);
3206  Relation childrel;
3207 
3208  if (list_member_oid(relids, childrelid))
3209  continue;
3210 
3211  /* find_all_inheritors already got lock */
3212  childrel = table_open(childrelid, NoLock);
3213 
3214  /*
3215  * Ignore temp tables of other backends. See similar code in
3216  * ExecuteTruncate().
3217  */
3218  if (RELATION_IS_OTHER_TEMP(childrel))
3219  {
3220  table_close(childrel, lockmode);
3221  continue;
3222  }
3223 
3225  rels = lappend(rels, childrel);
3226  part_rels = lappend(part_rels, childrel);
3227  relids = lappend_oid(relids, childrelid);
3228  /* Log this relation only if needed for logical decoding */
3229  if (RelationIsLogicallyLogged(childrel))
3230  relids_logged = lappend_oid(relids_logged, childrelid);
3231  }
3232  }
3233  }
3234 
3235  /*
3236  * Even if we used CASCADE on the upstream primary we explicitly default
3237  * to replaying changes without further cascading. This might be later
3238  * changeable with a user specified option.
3239  *
3240  * MySubscription->runasowner tells us whether we want to execute
3241  * replication actions as the subscription owner; the last argument to
3242  * TruncateGuts tells it whether we want to switch to the table owner.
3243  * Those are exactly opposite conditions.
3244  */
3245  ExecuteTruncateGuts(rels,
3246  relids,
3247  relids_logged,
3248  DROP_RESTRICT,
3249  restart_seqs,
3251  foreach(lc, remote_rels)
3252  {
3253  LogicalRepRelMapEntry *rel = lfirst(lc);
3254 
3256  }
3257  foreach(lc, part_rels)
3258  {
3259  Relation rel = lfirst(lc);
3260 
3261  table_close(rel, NoLock);
3262  }
3263 
3265 }
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2336
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1931

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2893 of file worker.c.

2897 {
2898  EState *estate = edata->estate;
2899  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2900  ResultRelInfo *relinfo = edata->targetRelInfo;
2901  Relation parentrel = relinfo->ri_RelationDesc;
2902  ModifyTableState *mtstate;
2903  PartitionTupleRouting *proute;
2904  ResultRelInfo *partrelinfo;
2905  Relation partrel;
2906  TupleTableSlot *remoteslot_part;
2907  TupleConversionMap *map;
2908  MemoryContext oldctx;
2909  LogicalRepRelMapEntry *part_entry = NULL;
2910  AttrMap *attrmap = NULL;
2911 
2912  /* ModifyTableState is needed for ExecFindPartition(). */
2913  edata->mtstate = mtstate = makeNode(ModifyTableState);
2914  mtstate->ps.plan = NULL;
2915  mtstate->ps.state = estate;
2916  mtstate->operation = operation;
2917  mtstate->resultRelInfo = relinfo;
2918 
2919  /* ... as is PartitionTupleRouting. */
2920  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2921 
2922  /*
2923  * Find the partition to which the "search tuple" belongs.
2924  */
2925  Assert(remoteslot != NULL);
2927  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2928  remoteslot, estate);
2929  Assert(partrelinfo != NULL);
2930  partrel = partrelinfo->ri_RelationDesc;
2931 
2932  /*
2933  * Check for supported relkind. We need this since partitions might be of
2934  * unsupported relkinds; and the set of partitions can change, so checking
2935  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2936  */
2937  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2939  RelationGetRelationName(partrel));
2940 
2941  /*
2942  * To perform any of the operations below, the tuple must match the
2943  * partition's rowtype. Convert if needed or just copy, using a dedicated
2944  * slot to store the tuple in any case.
2945  */
2946  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2947  if (remoteslot_part == NULL)
2948  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2949  map = ExecGetRootToChildMap(partrelinfo, estate);
2950  if (map != NULL)
2951  {
2952  attrmap = map->attrMap;
2953  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2954  remoteslot_part);
2955  }
2956  else
2957  {
2958  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2959  slot_getallattrs(remoteslot_part);
2960  }
2961  MemoryContextSwitchTo(oldctx);
2962 
2963  /* Check if we can do the update or delete on the leaf partition. */
2964  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2965  {
2966  part_entry = logicalrep_partition_open(relmapentry, partrel,
2967  attrmap);
2968  check_relation_updatable(part_entry);
2969  }
2970 
2971  switch (operation)
2972  {
2973  case CMD_INSERT:
2974  apply_handle_insert_internal(edata, partrelinfo,
2975  remoteslot_part);
2976  break;
2977 
2978  case CMD_DELETE:
2979  apply_handle_delete_internal(edata, partrelinfo,
2980  remoteslot_part,
2981  part_entry->localindexoid);
2982  break;
2983 
2984  case CMD_UPDATE:
2985 
2986  /*
2987  * For UPDATE, depending on whether or not the updated tuple
2988  * satisfies the partition's constraint, perform a simple UPDATE
2989  * of the partition or move the updated tuple into a different
2990  * suitable partition.
2991  */
2992  {
2993  TupleTableSlot *localslot;
2994  ResultRelInfo *partrelinfo_new;
2995  Relation partrel_new;
2996  bool found;
2997 
2998  /* Get the matching local tuple from the partition. */
2999  found = FindReplTupleInLocalRel(edata, partrel,
3000  &part_entry->remoterel,
3001  part_entry->localindexoid,
3002  remoteslot_part, &localslot);
3003  if (!found)
3004  {
3005  /*
3006  * The tuple to be updated could not be found. Do nothing
3007  * except for emitting a log message.
3008  *
3009  * XXX should this be promoted to ereport(LOG) perhaps?
3010  */
3011  elog(DEBUG1,
3012  "logical replication did not find row to be updated "
3013  "in replication target relation's partition \"%s\"",
3014  RelationGetRelationName(partrel));
3015  return;
3016  }
3017 
3018  /*
3019  * Apply the update to the local tuple, putting the result in
3020  * remoteslot_part.
3021  */
3023  slot_modify_data(remoteslot_part, localslot, part_entry,
3024  newtup);
3025  MemoryContextSwitchTo(oldctx);
3026 
3027  /*
3028  * Does the updated tuple still satisfy the current
3029  * partition's constraint?
3030  */
3031  if (!partrel->rd_rel->relispartition ||
3032  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3033  false))
3034  {
3035  /*
3036  * Yes, so simply UPDATE the partition. We don't call
3037  * apply_handle_update_internal() here, which would
3038  * normally do the following work, to avoid repeating some
3039  * work already done above to find the local tuple in the
3040  * partition.
3041  */
3042  EPQState epqstate;
3043 
3044  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3045  ExecOpenIndices(partrelinfo, false);
3046 
3047  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3049  ACL_UPDATE);
3050  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3051  localslot, remoteslot_part);
3052  ExecCloseIndices(partrelinfo);
3053  EvalPlanQualEnd(&epqstate);
3054  }
3055  else
3056  {
3057  /* Move the tuple into the new partition. */
3058 
3059  /*
3060  * New partition will be found using tuple routing, which
3061  * can only occur via the parent table. We might need to
3062  * convert the tuple to the parent's rowtype. Note that
3063  * this is the tuple found in the partition, not the
3064  * original search tuple received by this function.
3065  */
3066  if (map)
3067  {
3068  TupleConversionMap *PartitionToRootMap =
3070  RelationGetDescr(parentrel));
3071 
3072  remoteslot =
3073  execute_attr_map_slot(PartitionToRootMap->attrMap,
3074  remoteslot_part, remoteslot);
3075  }
3076  else
3077  {
3078  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3079  slot_getallattrs(remoteslot);
3080  }
3081 
3082  /* Find the new partition. */
3084  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3085  proute, remoteslot,
3086  estate);
3087  MemoryContextSwitchTo(oldctx);
3088  Assert(partrelinfo_new != partrelinfo);
3089  partrel_new = partrelinfo_new->ri_RelationDesc;
3090 
3091  /* Check that new partition also has supported relkind. */
3092  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3094  RelationGetRelationName(partrel_new));
3095 
3096  /* DELETE old tuple found in the old partition. */
3097  apply_handle_delete_internal(edata, partrelinfo,
3098  localslot,
3099  part_entry->localindexoid);
3100 
3101  /* INSERT new tuple into the new partition. */
3102 
3103  /*
3104  * Convert the replacement tuple to match the destination
3105  * partition rowtype.
3106  */
3108  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3109  if (remoteslot_part == NULL)
3110  remoteslot_part = table_slot_create(partrel_new,
3111  &estate->es_tupleTable);
3112  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3113  if (map != NULL)
3114  {
3115  remoteslot_part = execute_attr_map_slot(map->attrMap,
3116  remoteslot,
3117  remoteslot_part);
3118  }
3119  else
3120  {
3121  remoteslot_part = ExecCopySlot(remoteslot_part,
3122  remoteslot);
3123  slot_getallattrs(remoteslot);
3124  }
3125  MemoryContextSwitchTo(oldctx);
3126  apply_handle_insert_internal(edata, partrelinfo_new,
3127  remoteslot_part);
3128  }
3129  }
3130  break;
3131 
3132  default:
3133  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3134  break;
3135  }
3136 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:898
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1792
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1232
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
@ CMD_UPDATE
Definition: nodes.h:266
#define makeNode(_type_)
Definition: nodes.h:155
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetNamespace(relation)
Definition: rel.h:546
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:602
PartitionTupleRouting * proute
Definition: worker.c:215
ModifyTableState * mtstate
Definition: worker.c:214
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:669
CmdType operation
Definition: execnodes.h:1356
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1360
PlanState ps
Definition: execnodes.h:1355
Plan * plan
Definition: execnodes.h:1117
EState * state
Definition: execnodes.h:1119
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:583
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, LogicalRepRelMapEntry::localindexoid, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2324 of file worker.c.

2325 {
2326  LogicalRepTyp typ;
2327 
2329  return;
2330 
2331  logicalrep_read_typ(s, &typ);
2332 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2520 of file worker.c.

2521 {
2522  LogicalRepRelMapEntry *rel;
2523  LogicalRepRelId relid;
2524  UserContext ucxt;
2525  ApplyExecutionData *edata;
2526  EState *estate;
2527  LogicalRepTupleData oldtup;
2528  LogicalRepTupleData newtup;
2529  bool has_oldtup;
2530  TupleTableSlot *remoteslot;
2531  RTEPermissionInfo *target_perminfo;
2532  MemoryContext oldctx;
2533  bool run_as_owner;
2534 
2535  /*
2536  * Quick return if we are skipping data modification changes or handling
2537  * streamed transactions.
2538  */
2539  if (is_skipping_changes() ||
2541  return;
2542 
2544 
2545  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2546  &newtup);
2547  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2548  if (!should_apply_changes_for_rel(rel))
2549  {
2550  /*
2551  * The relation can't become interesting in the middle of the
2552  * transaction so it's safe to unlock it.
2553  */
2556  return;
2557  }
2558 
2559  /* Set relation for error callback */
2561 
2562  /* Check if we can do the update. */
2564 
2565  /*
2566  * Make sure that any user-supplied code runs as the table owner, unless
2567  * the user has opted out of that behavior.
2568  */
2569  run_as_owner = MySubscription->runasowner;
2570  if (!run_as_owner)
2571  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2572 
2573  /* Initialize the executor state. */
2574  edata = create_edata_for_relation(rel);
2575  estate = edata->estate;
2576  remoteslot = ExecInitExtraTupleSlot(estate,
2577  RelationGetDescr(rel->localrel),
2578  &TTSOpsVirtual);
2579 
2580  /*
2581  * Populate updatedCols so that per-column triggers can fire, and so
2582  * executor can correctly pass down indexUnchanged hint. This could
2583  * include more columns than were actually changed on the publisher
2584  * because the logical replication protocol doesn't contain that
2585  * information. But it would for example exclude columns that only exist
2586  * on the subscriber, since we are not touching those.
2587  */
2588  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2589  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2590  {
2592  int remoteattnum = rel->attrmap->attnums[i];
2593 
2594  if (!att->attisdropped && remoteattnum >= 0)
2595  {
2596  Assert(remoteattnum < newtup.ncols);
2597  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2598  target_perminfo->updatedCols =
2599  bms_add_member(target_perminfo->updatedCols,
2601  }
2602  }
2603 
2604  /* Build the search tuple. */
2606  slot_store_data(remoteslot, rel,
2607  has_oldtup ? &oldtup : &newtup);
2608  MemoryContextSwitchTo(oldctx);
2609 
2610  /* For a partitioned table, apply update to correct partition. */
2611  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2613  remoteslot, &newtup, CMD_UPDATE);
2614  else
2616  remoteslot, &newtup, rel->localindexoid);
2617 
2618  finish_edata(edata);
2619 
2620  /* Reset relation for error callback */
2622 
2623  if (!run_as_owner)
2624  RestoreUserContext(&ucxt);
2625 
2627 
2629 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2637
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
int i
Definition: isn.c:73
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:632
Bitmapset * updatedCols
Definition: parsenodes.h:1299
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2637 of file worker.c.

2642 {
2643  EState *estate = edata->estate;
2644  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2645  Relation localrel = relinfo->ri_RelationDesc;
2646  EPQState epqstate;
2647  TupleTableSlot *localslot;
2648  bool found;
2649  MemoryContext oldctx;
2650 
2651  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2652  ExecOpenIndices(relinfo, false);
2653 
2654  found = FindReplTupleInLocalRel(edata, localrel,
2655  &relmapentry->remoterel,
2656  localindexoid,
2657  remoteslot, &localslot);
2658  ExecClearTuple(remoteslot);
2659 
2660  /*
2661  * Tuple found.
2662  *
2663  * Note this will fail if there are other conflicting unique indexes.
2664  */
2665  if (found)
2666  {
2667  /* Process and store remote tuple in the slot */
2669  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2670  MemoryContextSwitchTo(oldctx);
2671 
2672  EvalPlanQualSetSlot(&epqstate, remoteslot);
2673 
2674  /* Do the actual update. */
2676  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2677  remoteslot);
2678  }
2679  else
2680  {
2681  /*
2682  * The tuple to be updated could not be found. Do nothing except for
2683  * emitting a log message.
2684  *
2685  * XXX should this be promoted to ereport(LOG) perhaps?
2686  */
2687  elog(DEBUG1,
2688  "logical replication did not find row to be updated "
2689  "in replication target relation \"%s\"",
2690  RelationGetRelationName(localrel));
2691  }
2692 
2693  /* Cleanup. */
2694  ExecCloseIndices(relinfo);
2695  EvalPlanQualEnd(&epqstate);
2696 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454

References ACL_UPDATE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2001 of file worker.c.

2003 {
2004  int nchanges;
2005  char path[MAXPGPATH];
2006  char *buffer = NULL;
2007  MemoryContext oldcxt;
2008  ResourceOwner oldowner;
2009  int fileno;
2010  off_t offset;
2011 
2012  if (!am_parallel_apply_worker())
2014 
2015  /* Make sure we have an open transaction */
2017 
2018  /*
2019  * Allocate file handle and memory required to process all the messages in
2020  * TopTransactionContext to avoid them getting reset after each message is
2021  * processed.
2022  */
2024 
2025  /* Open the spool file for the committed/prepared transaction */
2027  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2028 
2029  /*
2030  * Make sure the file is owned by the toplevel transaction so that the
2031  * file will not be accidentally closed when aborting a subtransaction.
2032  */
2033  oldowner = CurrentResourceOwner;
2035 
2036  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2037 
2038  CurrentResourceOwner = oldowner;
2039 
2040  buffer = palloc(BLCKSZ);
2041 
2042  MemoryContextSwitchTo(oldcxt);
2043 
2044  remote_final_lsn = lsn;
2045 
2046  /*
2047  * Make sure the handle apply_dispatch methods are aware we're in a remote
2048  * transaction.
2049  */
2050  in_remote_transaction = true;
2052 
2054 
2055  /*
2056  * Read the entries one by one and pass them through the same logic as in
2057  * apply_dispatch.
2058  */
2059  nchanges = 0;
2060  while (true)
2061  {
2063  size_t nbytes;
2064  int len;
2065 
2067 
2068  /* read length of the on-disk record */
2069  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2070 
2071  /* have we reached end of the file? */
2072  if (nbytes == 0)
2073  break;
2074 
2075  /* do we have a correct length? */
2076  if (len <= 0)
2077  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2078  len, path);
2079 
2080  /* make sure we have sufficiently large buffer */
2081  buffer = repalloc(buffer, len);
2082 
2083  /* and finally read the data into the buffer */
2084  BufFileReadExact(stream_fd, buffer, len);
2085 
2086  BufFileTell(stream_fd, &fileno, &offset);
2087 
2088  /* init a stringinfo using the buffer and call apply_dispatch */
2089  initReadOnlyStringInfo(&s2, buffer, len);
2090 
2091  /* Ensure we are reading the data into our memory context. */
2093 
2094  apply_dispatch(&s2);
2095 
2097 
2098  MemoryContextSwitchTo(oldcxt);
2099 
2100  nchanges++;
2101 
2102  /*
2103  * It is possible the file has been closed because we have processed
2104  * the transaction end message like stream_commit in which case that
2105  * must be the last message.
2106  */
2107  if (!stream_fd)
2108  {
2109  ensure_last_message(stream_fileset, xid, fileno, offset);
2110  break;
2111  }
2112 
2113  if (nchanges % 1000 == 0)
2114  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2115  nchanges, path);
2116  }
2117 
2118  if (stream_fd)
2120 
2121  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2122  nchanges, path);
2123 
2124  return;
2125 }
MemoryContext ApplyMessageContext
Definition: worker.c:290
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4191
void apply_dispatch(StringInfo s)
Definition: worker.c:3272
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1969
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1540
void * palloc(Size size)
Definition: mcxt.c:1316
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 3831 of file worker.c.

3832 {
3834  {
3835  /*
3836  * Don't stop the parallel apply worker as the leader will detect the
3837  * subscription parameter change and restart logical replication later
3838  * anyway. This also prevents the leader from reporting errors when
3839  * trying to communicate with a stopped parallel apply worker, which
3840  * would accidentally disable subscriptions if disable_on_error was
3841  * set.
3842  */
3843  return;
3844  }
3845 
3846  /*
3847  * Reset the last-start time for this apply worker so that the launcher
3848  * will restart it without waiting for wal_retrieve_retry_interval if the
3849  * subscription is still active, and so that we won't leak that hash table
3850  * entry if it isn't.
3851  */
3852  if (am_leader_apply_worker())
3854 
3855  proc_exit(0);
3856 }
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1075
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), and maybe_reread_subscription().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4685 of file worker.c.

4686 {
4687  int worker_slot = DatumGetInt32(main_arg);
4688 
4689  InitializingApplyWorker = true;
4690 
4691  SetupApplyOrSyncWorker(worker_slot);
4692 
4693  InitializingApplyWorker = false;
4694 
4695  run_apply_worker();
4696 
4697  proc_exit(0);
4698 }
bool InitializingApplyWorker
Definition: worker.c:318
static void run_apply_worker()
Definition: worker.c:4457
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4644
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 5003 of file worker.c.

5004 {
5005  if (isCommit && on_commit_wakeup_workers_subids != NIL)
5006  {
5007  ListCell *lc;
5008 
5009  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5010  foreach(lc, on_commit_wakeup_workers_subids)
5011  {
5012  Oid subid = lfirst_oid(lc);
5013  List *workers;
5014  ListCell *lc2;
5015 
5016  workers = logicalrep_workers_find(subid, true);
5017  foreach(lc2, workers)
5018  {
5019  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5020 
5022  }
5023  }
5024  LWLockRelease(LogicalRepWorkerLock);
5025  }
5026 
5027  /* The List storage will be reclaimed automatically in xact cleanup. */
5029 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:301
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:696
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:275
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
@ LW_SHARED
Definition: lwlock.h:115

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4191 of file worker.c.

4192 {
4193  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4194 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2479 of file worker.c.

2480 {
2481  /*
2482  * For partitioned tables, we only need to care if the target partition is
2483  * updatable (aka has PK or RI defined for it).
2484  */
2485  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2486  return;
2487 
2488  /* Updatable, no error. */
2489  if (rel->updatable)
2490  return;
2491 
2492  /*
2493  * We are in error mode so it's fine this is somewhat slow. It's better to
2494  * give user correct error.
2495  */
2497  {
2498  ereport(ERROR,
2499  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2500  errmsg("publisher did not send replica identity column "
2501  "expected by the logical replication target relation \"%s.%s\"",
2502  rel->remoterel.nspname, rel->remoterel.relname)));
2503  }
2504 
2505  ereport(ERROR,
2506  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2507  errmsg("logical replication target relation \"%s.%s\" has "
2508  "neither REPLICA IDENTITY index nor PRIMARY "
2509  "KEY and published relation does not have "
2510  "REPLICA IDENTITY FULL",
2511  rel->remoterel.nspname, rel->remoterel.relname)));
2512 }
#define OidIsValid(objectId)
Definition: c.h:775
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:851

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 4388 of file worker.c.

4389 {
4390  if (subxact_data.subxacts)
4392 
4393  subxact_data.subxacts = NULL;
4395  subxact_data.nsubxacts = 0;
4397 }
static ApplySubXactData subxact_data
Definition: worker.c:357
void pfree(void *pointer)
Definition: mcxt.c:1520
uint32 nsubxacts
Definition: worker.c:351
uint32 nsubxacts_max
Definition: worker.c:352
SubXactInfo * subxacts
Definition: worker.c:354
TransactionId subxact_last
Definition: worker.c:353

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 4812 of file worker.c.

4813 {
4814  Relation rel;
4815  Form_pg_subscription subform;
4816  HeapTuple tup;
4817  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4818  bool started_tx = false;
4819 
4821  return;
4822 
4823  if (!IsTransactionState())
4824  {
4826  started_tx = true;
4827  }
4828 
4829  /*
4830  * Protect subskiplsn of pg_subscription from being concurrently updated
4831  * while clearing it.
4832  */
4833  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4834  AccessShareLock);
4835 
4836  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4837 
4838  /* Fetch the existing tuple. */
4839  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4841 
4842  if (!HeapTupleIsValid(tup))
4843  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4844 
4845  subform = (Form_pg_subscription) GETSTRUCT(tup);
4846 
4847  /*
4848  * Clear the subskiplsn. If the user has already changed subskiplsn before
4849  * clearing it we don't update the catalog and the replication origin
4850  * state won't get advanced. So in the worst case, if the server crashes
4851  * before sending an acknowledgment of the flush position the transaction
4852  * will be sent again and the user needs to set subskiplsn again. We can
4853  * reduce the possibility by logging a replication origin WAL record to
4854  * advance the origin LSN instead but there is no way to advance the
4855  * origin timestamp and it doesn't seem to be worth doing anything about
4856  * it since it's a very rare case.
4857  */
4858  if (subform->subskiplsn == myskiplsn)
4859  {
4860  bool nulls[Natts_pg_subscription];
4861  bool replaces[Natts_pg_subscription];
4862  Datum values[Natts_pg_subscription];
4863 
4864  memset(values, 0, sizeof(values));
4865  memset(nulls, false, sizeof(nulls));
4866  memset(replaces, false, sizeof(replaces));
4867 
4868  /* reset subskiplsn */
4869  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4870  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4871 
4872  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4873  replaces);
4874  CatalogTupleUpdate(rel, &tup->t_self, tup);
4875 
4876  if (myskiplsn != finish_lsn)
4877  ereport(WARNING,
4878  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4879  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4880  LSN_FORMAT_ARGS(finish_lsn),
4881  LSN_FORMAT_ARGS(myskiplsn)));
4882  }
4883 
4884  heap_freetuple(tup);
4885  table_close(rel, NoLock);
4886 
4887  if (started_tx)
4889 }
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define likely(x)
Definition: c.h:310
int errdetail(const char *fmt,...)
Definition: elog.c:1205
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1083
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 652 of file worker.c.

653 {
654  ApplyExecutionData *edata;
655  EState *estate;
656  RangeTblEntry *rte;
657  List *perminfos = NIL;
658  ResultRelInfo *resultRelInfo;
659 
660  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
661  edata->targetRel = rel;
662 
663  edata->estate = estate = CreateExecutorState();
664 
665  rte = makeNode(RangeTblEntry);
666  rte->rtekind = RTE_RELATION;
667  rte->relid = RelationGetRelid(rel->localrel);
668  rte->relkind = rel->localrel->rd_rel->relkind;
669  rte->rellockmode = AccessShareLock;
670 
671  addRTEPermissionInfo(&perminfos, rte);
672 
673  ExecInitRangeTable(estate, list_make1(rte), perminfos);
674 
675  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
676 
677  /*
678  * Use Relation opened by logicalrep_rel_open() instead of opening it
679  * again.
680  */
681  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
682 
683  /*
684  * We put the ResultRelInfo in the es_opened_result_relations list, even
685  * though we don't populate the es_result_relations array. That's a bit
686  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
687  *
688  * ExecOpenIndices() is not called here either, each execution path doing
689  * an apply operation being responsible for that.
690  */
692  lappend(estate->es_opened_result_relations, resultRelInfo);
693 
694  estate->es_output_cid = GetCurrentCommandId(true);
695 
696  /* Prepare to catch AFTER triggers. */
698 
699  /* other fields of edata remain NULL for now */
700 
701  return edata;
702 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1199
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:728
EState * CreateExecutorState(void)
Definition: execUtils.c:88
void * palloc0(Size size)
Definition: mcxt.c:1346
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1028
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:505
List * es_opened_result_relations
Definition: execnodes.h:645
CommandId es_output_cid
Definition: execnodes.h:639
RTEKind rtekind
Definition: parsenodes.h:1057
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5018
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:826

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 4705 of file worker.c.

4706 {
4707  /*
4708  * Emit the error message, and recover from the error state to an idle
4709  * state
4710  */
4711  HOLD_INTERRUPTS();
4712 
4713  EmitErrorReport();
4715  FlushErrorState();
4716 
4718 
4719  /* Report the worker failed during either table synchronization or apply */
4721  !am_tablesync_worker());
4722 
4723  /* Disable the subscription */
4727 
4728  /* Ensure we remove no-longer-useful entry for worker's start time */
4729  if (am_leader_apply_worker())
4731 
4732  /* Notify the subscription has been disabled and exit */
4733  ereport(LOG,
4734  errmsg("subscription \"%s\" has been disabled because of an error",
4735  MySubscription->name));
4736 
4737  proc_exit(0);
4738 }
void EmitErrorReport(void)
Definition: elog.c:1672
void FlushErrorState(void)
Definition: elog.c:1836
#define LOG
Definition: elog.h:31
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4811

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 1969 of file worker.c.

1971 {
1972  char path[MAXPGPATH];
1973  BufFile *fd;
1974  int last_fileno;
1975  off_t last_offset;
1976 
1978 
1980 
1982 
1983  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
1984 
1985  BufFileSeek(fd, 0, 0, SEEK_END);
1986  BufFileTell(fd, &last_fileno, &last_offset);
1987 
1988  BufFileClose(fd);
1989 
1991 
1992  if (last_fileno != fileno || last_offset != offset)
1993  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
1994  path);
1995 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2846 of file worker.c.

2851 {
2852  EState *estate = edata->estate;
2853  bool found;
2854 
2855  /*
2856  * Regardless of the top-level operation, we're performing a read here, so
2857  * check for SELECT privileges.
2858  */
2859  TargetPrivilegesCheck(localrel, ACL_SELECT);
2860 
2861  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2862 
2863  Assert(OidIsValid(localidxoid) ||
2864  (remoterel->replident == REPLICA_IDENTITY_FULL));
2865 
2866  if (OidIsValid(localidxoid))
2867  {
2868 #ifdef USE_ASSERT_CHECKING
2869  Relation idxrel = index_open(localidxoid, AccessShareLock);
2870 
2871  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2872  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2874  edata->targetRel->attrmap));
2875  index_close(idxrel, AccessShareLock);
2876 #endif
2877 
2878  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2880  remoteslot, *localslot);
2881  }
2882  else
2883  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2884  remoteslot, *localslot);
2885 
2886  return found;
2887 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2407
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap)
Definition: relation.c:804

References AccessShareLock, ACL_SELECT, Assert, LogicalRepRelMapEntry::attrmap, BuildIndexInfo(), EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 709 of file worker.c.

710 {
711  EState *estate = edata->estate;
712 
713  /* Handle any queued AFTER triggers. */
714  AfterTriggerEndQuery(estate);
715 
716  /* Shut down tuple routing, if any was done. */
717  if (edata->proute)
718  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
719 
720  /*
721  * Cleanup. It might seem that we should call ExecCloseResultRelations()
722  * here, but we intentionally don't. It would close the rel we added to
723  * es_opened_result_relations above, which is wrong because we took no
724  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
725  * any other relations opened during execution.
726  */
727  ExecResetTupleTable(estate->es_tupleTable, false);
728  FreeExecutorState(estate);
729  pfree(edata);
730 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1278
void FreeExecutorState(EState *estate)
Definition: execUtils.c:189
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5038

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3392 of file worker.c.

3394 {
3395  dlist_mutable_iter iter;
3396  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3397 
3399  *flush = InvalidXLogRecPtr;
3400 
3402  {
3403  FlushPosition *pos =
3404  dlist_container(FlushPosition, node, iter.cur);
3405 
3406  *write = pos->remote_end;
3407 
3408  if (pos->local_end <= local_flush)
3409  {
3410  *flush = pos->remote_end;
3411  dlist_delete(iter.cur);
3412  pfree(pos);
3413  }
3414  else
3415  {
3416  /*
3417  * Don't want to uselessly iterate over the rest of the list which
3418  * could potentially be long. Instead get the last element and
3419  * grab the write position from there.
3420  */
3421  pos = dlist_tail_element(FlushPosition, node,
3422  &lsn_mapping);
3423  *write = pos->remote_end;
3424  *have_pending_txes = true;
3425  return;
3426  }
3427  }
3428 
3429  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3430 }
static dlist_head lsn_mapping
Definition: worker.c:204
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:201
XLogRecPtr local_end
Definition: worker.c:200
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6455

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 5050 of file worker.c.

5051 {
5052  *winfo = NULL;
5053 
5055  {
5056  return TRANS_PARALLEL_APPLY;
5057  }
5058 
5059  /*
5060  * If we are processing this transaction using a parallel apply worker
5061  * then either we send the changes to the parallel worker or if the worker
5062  * is busy then serialize the changes to the file which will later be
5063  * processed by the parallel worker.
5064  */
5065  *winfo = pa_find_worker(xid);
5066 
5067  if (*winfo && (*winfo)->serialize_changes)
5068  {
5070  }
5071  else if (*winfo)
5072  {
5074  }
5075 
5076  /*
5077  * If there is no parallel worker involved to process this transaction
5078  * then we either directly apply the change or serialize it to a file
5079  * which will later be applied when the transaction finish message is
5080  * processed.
5081  */
5082  else if (in_streamed_transaction)
5083  {
5084  return TRANS_LEADER_SERIALIZE;
5085  }
5086  else
5087  {
5088  return TRANS_LEADER_APPLY;
5089  }
5090 }
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 559 of file worker.c.

560 {
561  TransactionId current_xid;
563  TransApplyAction apply_action;
564  StringInfoData original_msg;
565 
566  apply_action = get_transaction_apply_action(stream_xid, &winfo);
567 
568  /* not in streaming mode */
569  if (apply_action == TRANS_LEADER_APPLY)
570  return false;
571 
573 
574  /*
575  * The parallel apply worker needs the xid in this message to decide
576  * whether to define a savepoint, so save the original message that has
577  * not moved the cursor after the xid. We will serialize this message to a
578  * file in PARTIAL_SERIALIZE mode.
579  */
580  original_msg = *s;
581 
582  /*
583  * We should have received XID of the subxact as the first part of the
584  * message, so extract it.
585  */
586  current_xid = pq_getmsgint(s, 4);
587 
588  if (!TransactionIdIsValid(current_xid))
589  ereport(ERROR,
590  (errcode(ERRCODE_PROTOCOL_VIOLATION),
591  errmsg_internal("invalid transaction ID in streamed replication transaction")));
592 
593  switch (apply_action)
594  {
596  Assert(stream_fd);
597 
598  /* Add the new subxact to the array (unless already there). */
599  subxact_info_add(current_xid);
600 
601  /* Write the change to the current file */
603  return true;
604 
606  Assert(winfo);
607 
608  /*
609  * XXX The publisher side doesn't always send relation/type update
610  * messages after the streaming transaction, so also update the
611  * relation/type in leader apply worker. See function
612  * cleanup_rel_sync_cache.
613  */
614  if (pa_send_data(winfo, s->len, s->data))
615  return (action != LOGICAL_REP_MSG_RELATION &&
617 
618  /*
619  * Switch to serialize mode when we are not able to send the
620  * change to parallel apply worker.
621  */
622  pa_switch_to_partial_serialize(winfo, false);
623 
624  /* fall through */
626  stream_write_change(action, &original_msg);
627 
628  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
629  return (action != LOGICAL_REP_MSG_RELATION &&
631 
634 
635  /* Define a savepoint for a subxact if needed. */
636  pa_start_subtrans(current_xid, stream_xid);
637  return false;
638 
639  default:
640  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
641  return false; /* silence compiler warning */
642  }
643 }
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4106
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415

References generate_unaccent_rules::action, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 4561 of file worker.c.

4562 {
4563  MemoryContext oldctx;
4564 
4565  /* Run as replica session replication role. */
4566  SetConfigOption("session_replication_role", "replica",
4568 
4569  /* Connect to our database. */
4572  0);
4573 
4574  /*
4575  * Set always-secure search path, so malicious users can't redirect user
4576  * code (e.g. pg_index.indexprs).
4577  */
4578  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4579 
4580  /* Load the subscription into persistent memory context. */
4582  "ApplyContext",
4586 
4588  if (!MySubscription)
4589  {
4590  ereport(LOG,
4591  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4593 
4594  /* Ensure we remove no-longer-useful entry for worker's start time */
4595  if (am_leader_apply_worker())
4597 
4598  proc_exit(0);
4599  }
4600 
4601  MySubscriptionValid = true;
4602  MemoryContextSwitchTo(oldctx);
4603 
4604  if (!MySubscription->enabled)
4605  {
4606  ereport(LOG,
4607  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4608  MySubscription->name)));
4609 
4611  }
4612 
4613  /* Setup synchronous commit according to the user's wishes */
4614  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4616 
4617  /*
4618  * Keep us informed about subscription or role changes. Note that the
4619  * role's superuser privilege can be revoked.
4620  */
4621  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4623  (Datum) 0);
4624 
4627  (Datum) 0);
4628 
4629  if (am_tablesync_worker())
4630  ereport(LOG,
4631  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4634  else
4635  ereport(LOG,
4636  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4637  MySubscription->name)));
4638 
4640 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3991
static void apply_worker_exit(void)
Definition: worker.c:3831
MemoryContext ApplyContext
Definition: worker.c:291
static bool MySubscriptionValid
Definition: worker.c:299
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4285
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
MemoryContext TopMemoryContext
Definition: mcxt.c:149
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:4189

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4753 of file worker.c.

4754 {
4756 }
bool IsLogicalWorker(void)
Definition: worker.c:4744

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4744 of file worker.c.

4745 {
4746  return MyLogicalRepWorker != NULL;
4747 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3478 of file worker.c.

3479 {
3480  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3481  bool ping_sent = false;
3482  TimeLineID tli;
3483  ErrorContextCallback errcallback;
3484 
3485  /*
3486  * Init the ApplyMessageContext which we clean up after each replication
3487  * protocol message.
3488  */
3490  "ApplyMessageContext",
3492 
3493  /*
3494  * This memory context is used for per-stream data when the streaming mode
3495  * is enabled. This context is reset on each stream stop.
3496  */
3498  "LogicalStreamingContext",
3500 
3501  /* mark as idle, before starting to loop */
3503 
3504  /*
3505  * Push apply error context callback. Fields will be filled while applying
3506  * a change.
3507  */
3508  errcallback.callback = apply_error_callback;
3509  errcallback.previous = error_context_stack;
3510  error_context_stack = &errcallback;
3512 
3513  /* This outer loop iterates once per wait. */
3514  for (;;)
3515  {
3517  int rc;
3518  int len;
3519  char *buf = NULL;
3520  bool endofstream = false;
3521  long wait_time;
3522 
3524 
3526 
3528 
3529  if (len != 0)
3530  {
3531  /* Loop to process all available data (without blocking). */
3532  for (;;)
3533  {
3535 
3536  if (len == 0)
3537  {
3538  break;
3539  }
3540  else if (len < 0)
3541  {
3542  ereport(LOG,
3543  (errmsg("data stream from publisher has ended")));
3544  endofstream = true;
3545  break;
3546  }
3547  else
3548  {
3549  int c;
3550  StringInfoData s;
3551 
3552  if (ConfigReloadPending)
3553  {
3554  ConfigReloadPending = false;
3556  }
3557 
3558  /* Reset timeout. */
3559  last_recv_timestamp = GetCurrentTimestamp();
3560  ping_sent = false;
3561 
3562  /* Ensure we are reading the data into our memory context. */
3564 
3566 
3567  c = pq_getmsgbyte(&s);
3568 
3569  if (c == 'w')
3570  {
3571  XLogRecPtr start_lsn;
3572  XLogRecPtr end_lsn;
3573  TimestampTz send_time;
3574 
3575  start_lsn = pq_getmsgint64(&s);
3576  end_lsn = pq_getmsgint64(&s);
3577  send_time = pq_getmsgint64(&s);
3578 
3579  if (last_received < start_lsn)
3580  last_received = start_lsn;
3581 
3582  if (last_received < end_lsn)
3583  last_received = end_lsn;
3584 
3585  UpdateWorkerStats(last_received, send_time, false);
3586 
3587  apply_dispatch(&s);
3588  }
3589  else if (c == 'k')
3590  {
3591  XLogRecPtr end_lsn;
3593  bool reply_requested;
3594 
3595  end_lsn = pq_getmsgint64(&s);
3596  timestamp = pq_getmsgint64(&s);
3597  reply_requested = pq_getmsgbyte(&s);
3598 
3599  if (last_received < end_lsn)
3600  last_received = end_lsn;
3601 
3602  send_feedback(last_received, reply_requested, false);
3603  UpdateWorkerStats(last_received, timestamp, true);
3604  }
3605  /* other message types are purposefully ignored */
3606 
3608  }
3609 
3611  }
3612  }
3613 
3614  /* confirm all writes so far */
3615  send_feedback(last_received, false, false);
3616 
3618  {
3619  /*
3620  * If we didn't get any transactions for a while there might be
3621  * unconsumed invalidation messages in the queue, consume them
3622  * now.
3623  */
3626 
3627  /* Process any table synchronization changes. */
3628  process_syncing_tables(last_received);
3629  }
3630 
3631  /* Cleanup the memory. */
3634 
3635  /* Check if we need to exit the streaming loop. */
3636  if (endofstream)
3637  break;
3638 
3639  /*
3640  * Wait for more data or latch. If we have unflushed transactions,
3641  * wake up after WalWriterDelay to see if they've been flushed yet (in
3642  * which case we should send a feedback message). Otherwise, there's
3643  * no particular urgency about waking up unless we get data or a
3644  * signal.
3645  */
3646  if (!dlist_is_empty(&lsn_mapping))
3647  wait_time = WalWriterDelay;
3648  else
3649  wait_time = NAPTIME_PER_CYCLE;
3650 
3654  fd, wait_time,
3655  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3656 
3657  if (rc & WL_LATCH_SET)
3658  {
3661  }
3662 
3663  if (ConfigReloadPending)
3664  {
3665  ConfigReloadPending = false;
3667  }
3668 
3669  if (rc & WL_TIMEOUT)
3670  {
3671  /*
3672  * We didn't receive anything new. If we haven't heard anything
3673  * from the server for more than wal_receiver_timeout / 2, ping
3674  * the server. Also, if it's been longer than
3675  * wal_receiver_status_interval since the last update we sent,
3676  * send a status update to the primary anyway, to report any
3677  * progress in applying WAL.
3678  */
3679  bool requestReply = false;
3680 
3681  /*
3682  * Check if time since last receive from primary has reached the
3683  * configured limit.
3684  */
3685  if (wal_receiver_timeout > 0)
3686  {
3688  TimestampTz timeout;
3689 
3690  timeout =
3691  TimestampTzPlusMilliseconds(last_recv_timestamp,
3693 
3694  if (now >= timeout)
3695  ereport(ERROR,
3696  (errcode(ERRCODE_CONNECTION_FAILURE),
3697  errmsg("terminating logical replication worker due to timeout")));
3698 
3699  /* Check to see if it's time for a ping. */
3700  if (!ping_sent)
3701  {
3702  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3703  (wal_receiver_timeout / 2));
3704  if (now >= timeout)
3705  {
3706  requestReply = true;
3707  ping_sent = true;
3708  }
3709  }
3710  }
3711 
3712  send_feedback(last_received, requestReply, requestReply);
3713 
3714  /*
3715  * Force reporting to ensure long idle periods don't lead to
3716  * arbitrarily delayed stats. Stats can only be reported outside
3717  * of (implicit or explicit) transactions. That shouldn't lead to
3718  * stats being delayed for long, because transactions are either
3719  * sent as a whole on commit or streamed. Streamed transactions
3720  * are spilled to disk and applied on commit.
3721  */
3722  if (!IsTransactionState())
3723  pgstat_report_stat(true);
3724  }
3725  }
3726 
3727  /* Pop the error context stack */
3728  error_context_stack = errcallback.previous;
3730 
3731  /* All done */
3733 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3462
#define NAPTIME_PER_CYCLE
Definition: worker.c:195
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:288
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3742
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:296
void apply_error_callback(void *arg)
Definition: worker.c:4893
static MemoryContext LogicalStreamingContext
Definition: worker.c:294
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:94
struct Latch * MyLatch
Definition: globals.c:60
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static char * buf
Definition: pg_test_fsync.c:73
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
char * c
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_receiver_timeout
Definition: walreceiver.c:88
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:450
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:452
int WalWriterDelay
Definition: walwriter.c:71
uint32 TimeLineID
Definition: xlogdefs.h:59

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3862 of file worker.c.

3863 {
3864  MemoryContext oldctx;
3866  bool started_tx = false;
3867 
3868  /* When cache state is valid there is nothing to do here. */
3869  if (MySubscriptionValid)
3870  return;
3871 
3872  /* This function might be called inside or outside of transaction. */
3873  if (!IsTransactionState())
3874  {
3876  started_tx = true;
3877  }
3878 
3879  /* Ensure allocations in permanent context. */
3881 
3883 
3884  /*
3885  * Exit if the subscription was removed. This normally should not happen
3886  * as the worker gets killed during DROP SUBSCRIPTION.
3887  */
3888  if (!newsub)
3889  {
3890  ereport(LOG,
3891  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3892  MySubscription->name)));
3893 
3894  /* Ensure we remove no-longer-useful entry for worker's start time */
3895  if (am_leader_apply_worker())
3897 
3898  proc_exit(0);
3899  }
3900 
3901  /* Exit if the subscription was disabled. */
3902  if (!newsub->enabled)
3903  {
3904  ereport(LOG,
3905  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3906  MySubscription->name)));
3907 
3909  }
3910 
3911  /* !slotname should never happen when enabled is true. */
3912  Assert(newsub->slotname);
3913 
3914  /* two-phase should not be altered */
3915  Assert(newsub->twophasestate == MySubscription->twophasestate);
3916 
3917  /*
3918  * Exit if any parameter that affects the remote connection was changed.
3919  * The launcher will start a new worker but note that the parallel apply
3920  * worker won't restart if the streaming option's value is changed from
3921  * 'parallel' to any other value or the server decides not to stream the
3922  * in-progress transaction.
3923  */
3924  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3925  strcmp(newsub->name, MySubscription->name) != 0 ||
3926  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3927  newsub->binary != MySubscription->binary ||
3928  newsub->stream != MySubscription->stream ||
3929  newsub->passwordrequired != MySubscription->passwordrequired ||
3930  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3931  newsub->owner != MySubscription->owner ||
3932  !equal(newsub->publications, MySubscription->publications))
3933  {
3935  ereport(LOG,
3936  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3937  MySubscription->name)));
3938  else
3939  ereport(LOG,
3940  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3941  MySubscription->name)));
3942 
3944  }
3945 
3946  /*
3947  * Exit if the subscription owner's superuser privileges have been
3948  * revoked.
3949  */
3950  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3951  {
3953  ereport(LOG,
3954  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3955  MySubscription->name));
3956  else
3957  ereport(LOG,
3958  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3959  MySubscription->name));
3960 
3962  }
3963 
3964  /* Check for other changes that should never happen too. */
3965  if (newsub->dbid != MySubscription->dbid)
3966  {
3967  elog(ERROR, "subscription %u changed unexpectedly",
3969  }
3970 
3971  /* Clean old subscription info and switch to new one. */
3974 
3975  MemoryContextSwitchTo(oldctx);
3976 
3977  /* Change synchronous commit according to the user's wishes */
3978  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3980 
3981  if (started_tx)
3983 
3984  MySubscriptionValid = true;
3985 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 4763 of file worker.c.

4764 {
4768 
4769  /*
4770  * Quick return if it's not requested to skip this transaction. This
4771  * function is called for every remote transaction and we assume that
4772  * skipping the transaction is not used often.
4773  */
4775  MySubscription->skiplsn != finish_lsn))
4776  return;
4777 
4778  /* Start skipping all changes of this transaction */
4779  skip_xact_finish_lsn = finish_lsn;
4780 
4781  ereport(LOG,
4782  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4784 }
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:335

References Assert, ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 428 of file worker.c.

430 {
431  if (OidIsValid(relid))
432  {
433  /* Replication origin name for tablesync workers. */
434  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
435  }
436  else
437  {
438  /* Replication origin name for non-tablesync workers. */
439  snprintf(originname, szoriginname, "pg_%u", suboid);
440  }
441 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), run_apply_worker(), and run_tablesync_worker().

◆ reset_apply_error_context_info()

◆ run_apply_worker()

static void run_apply_worker ( )
static

Definition at line 4457 of file worker.c.

4458 {
4459  char originname[NAMEDATALEN];
4460  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4461  char *slotname = NULL;
4463  RepOriginId originid;
4464  TimeLineID startpointTLI;
4465  char *err;
4466  bool must_use_password;
4467 
4468  slotname = MySubscription->slotname;
4469 
4470  /*
4471  * This shouldn't happen if the subscription is enabled, but guard against
4472  * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4473  * slot is NULL.)
4474  */
4475  if (!slotname)
4476  ereport(ERROR,
4477  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4478  errmsg("subscription has no replication slot set")));
4479 
4480  /* Setup replication origin tracking. */
4482  originname, sizeof(originname));
4484  originid = replorigin_by_name(originname, true);
4485  if (!OidIsValid(originid))
4486  originid = replorigin_create(originname);
4487  replorigin_session_setup(originid, 0);
4488  replorigin_session_origin = originid;
4489  origin_startpos = replorigin_session_get_progress(false);
4491 
4492  /* Is the use of a password mandatory? */
4493  must_use_password = MySubscription->passwordrequired &&
4495 
4497  true, must_use_password,
4498  MySubscription->name, &err);
4499 
4500  if (LogRepWorkerWalRcvConn == NULL)
4501  ereport(ERROR,
4502  (errcode(ERRCODE_CONNECTION_FAILURE),
4503  errmsg("could not connect to the publisher: %s", err)));
4504 
4505  /*
4506  * We don't really use the output identify_system for anything but it does
4507  * some initializations on the upstream so let's still call it.
4508  */
4509  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4510 
4511  set_apply_error_context_origin(originname);
4512 
4513  set_stream_options(&options, slotname, &origin_startpos);
4514 
4515  /*
4516  * Even when the two_phase mode is requested by the user, it remains as
4517  * the tri-state PENDING until all tablesyncs have reached READY state.
4518  * Only then, can it become ENABLED.
4519  *
4520  * Note: If the subscription has no tables then leave the state as
4521  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4522  * work.
4523  */
4526  {
4527  /* Start streaming with two_phase enabled */
4528  options.proto.logical.twophase = true;
4530 
4535  }
4536  else
4537  {
4539  }
4540 
4541  ereport(DEBUG1,
4542  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4547  "?")));
4548 
4549  /* Run the main loop. */
4550  start_apply(origin_startpos);
4551 }
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4338
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4425
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:428
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5035
void err(int eval, const char *fmt,...)
Definition: err.c:43
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:155
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
#define NAMEDATALEN
static char ** options
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
bool AllTablesyncsReady(void)
Definition: tablesync.c:1731
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1756
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:448
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:432
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:440
uint16 RepOriginId
Definition: xlogdefs.h:65

References AllTablesyncsReady(), CommitTransactionCommand(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, InvalidOid, InvalidXLogRecPtr, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, LogRepWorkerWalRcvConn, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::ownersuperuser, Subscription::passwordrequired, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), set_apply_error_context_origin(), set_stream_options(), Subscription::slotname, start_apply(), StartTransactionCommand(), Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by ApplyWorkerMain().

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 3742 of file worker.c.

3743 {
3744  static StringInfo reply_message = NULL;
3745  static TimestampTz send_time = 0;
3746 
3747  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3748  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3749  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3750 
3751  XLogRecPtr writepos;
3752  XLogRecPtr flushpos;
3753  TimestampTz now;
3754  bool have_pending_txes;
3755 
3756  /*
3757  * If the user doesn't want status to be reported to the publisher, be
3758  * sure to exit before doing anything at all.
3759  */
3760  if (!force && wal_receiver_status_interval <= 0)
3761  return;
3762 
3763  /* It's legal to not pass a recvpos */
3764  if (recvpos < last_recvpos)
3765  recvpos = last_recvpos;
3766 
3767  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3768 
3769  /*
3770  * No outstanding transactions to flush, we can report the latest received
3771  * position. This is important for synchronous replication.
3772  */
3773  if (!have_pending_txes)
3774  flushpos = writepos = recvpos;
3775 
3776  if (writepos < last_writepos)
3777  writepos = last_writepos;
3778 
3779  if (flushpos < last_flushpos)
3780  flushpos = last_flushpos;
3781 
3783 
3784  /* if we've already reported everything we're good */
3785  if (!force &&
3786  writepos == last_writepos &&
3787  flushpos == last_flushpos &&
3788  !TimestampDifferenceExceeds(send_time, now,
3790  return;
3791  send_time = now;
3792 
3793  if (!reply_message)
3794  {
3796 
3798  MemoryContextSwitchTo(oldctx);
3799  }
3800  else
3802 
3803  pq_sendbyte(reply_message, 'r');
3804  pq_sendint64(reply_message, recvpos); /* write */
3805  pq_sendint64(reply_message, flushpos); /* flush */
3806  pq_sendint64(reply_message, writepos); /* apply */
3807  pq_sendint64(reply_message, now); /* sendTime */
3808  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3809 
3810  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3811  force,
3812  LSN_FORMAT_ARGS(recvpos),
3813  LSN_FORMAT_ARGS(writepos),
3814  LSN_FORMAT_ARGS(flushpos));
3815 
3818 
3819  if (recvpos > last_recvpos)
3820  last_recvpos = recvpos;
3821  if (writepos > last_writepos)
3822  last_writepos = writepos;
3823  if (flushpos > last_flushpos)
3824  last_flushpos = flushpos;
3825 }
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3392
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1790
#define DEBUG2
Definition: elog.h:29
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static StringInfoData reply_message
Definition: walreceiver.c:131
int wal_receiver_status_interval
Definition: walreceiver.c:87
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:454

References ApplyContext, StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 5035 of file worker.c.

5036 {
5038  originname);
5039 }
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1682

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_xact()

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 4338 of file worker.c.

4341 {
4342  int server_version;
4343 
4344  options->logical = true;
4345  options->startpoint = *origin_startpos;
4346  options->slotname = slotname;
4347 
4349  options->proto.logical.proto_version =
4354 
4355  options->proto.logical.publication_names = MySubscription->publications;
4356  options->proto.logical.binary = MySubscription->binary;
4357 
4358  /*
4359  * Assign the appropriate option value for streaming option according to
4360  * the 'streaming' mode and the publisher's ability to support that mode.
4361  */
4362  if (server_version >= 160000 &&
4364  {
4365  options->proto.logical.streaming_str = "parallel";
4367  }
4368  else if (server_version >= 140000 &&
4370  {
4371  options->proto.logical.streaming_str = "on";
4373  }
4374  else
4375  {
4376  options->proto.logical.streaming_str = NULL;
4378  }
4379 
4380  options->proto.logical.twophase = false;
4381  options->proto.logical.origin = pstrdup(MySubscription->origin);
4382 }
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1695
static int server_version
Definition: pg_dumpall.c:110
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define walrcv_server_version(conn)
Definition: walreceiver.h:444

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_PARALLEL, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 4644 of file worker.c.

4645 {
4646  /* Attach to slot */
4647  logicalrep_worker_attach(worker_slot);
4648 
4650 
4651  /* Setup signal handling */
4653  pqsignal(SIGTERM, die);
4655 
4656  /*
4657  * We don't currently need any ResourceOwner in a walreceiver process, but
4658  * if we did, we could call CreateAuxProcessResourceOwner here.
4659  */
4660 
4661  /* Initialise stats to a sanish value */
4664 
4665  /* Load the libpq-specific functions */
4666  load_file("libpqwalreceiver", false);
4667 
4669 
4670  /* Connect to the origin and start the replication. */
4671  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4673 
4674  /*
4675  * Setup callback for syscache so that we know when something changes in
4676  * the subscription relation state.
4677  */
4678  CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4680  (Datum) 0);
4681 }
void InitializeLogRepWorker(void)
Definition: worker.c:4561
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition: launcher.c:707
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:4229
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:281
#define SIGHUP
Definition: win32_port.h:168

References am_leader_apply_worker(), am_tablesync_worker(), Assert, BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog, GetCurrentTimestamp(), InitializeLogRepWorker(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal(), LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), and TablesyncWorkerMain().

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 468 of file worker.c.

469 {
470  switch (MyLogicalRepWorker->type)
471  {
473  return MyLogicalRepWorker->relid == rel->localreloid;
474 
476  /* We don't synchronize rel's that are in unknown state. */
477  if (rel->state != SUBREL_STATE_READY &&
478  rel->state != SUBREL_STATE_UNKNOWN)
479  ereport(ERROR,
480  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
481  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
483  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
484 
485  return rel->state == SUBREL_STATE_READY;
486 
487  case WORKERTYPE_APPLY:
488  return (rel->state == SUBREL_STATE_READY ||
489  (rel->state == SUBREL_STATE_SYNCDONE &&
490  rel->statelsn <= remote_final_lsn));
491 
492  case WORKERTYPE_UNKNOWN:
493  /* Should never happen. */
494  elog(ERROR, "Unknown worker type");
495  }
496 
497  return false; /* dummy for compiler */
498 }
LogicalRepWorkerType type
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY

References elog, ereport, errcode(), errdetail(), errmsg(), ERROR, LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 740 of file worker.c.

742 {
743  TupleDesc desc = RelationGetDescr(rel->localrel);
744  int num_phys_attrs = desc->natts;
745  int i;
746  int attnum,
747  num_defaults = 0;
748  int *defmap;
749  ExprState **defexprs;
750  ExprContext *econtext;
751 
752  econtext = GetPerTupleExprContext(estate);
753 
754  /* We got all the data via replication, no need to evaluate anything. */
755  if (num_phys_attrs == rel->remoterel.natts)
756  return;
757 
758  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
759  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
760 
761  Assert(rel->attrmap->maplen == num_phys_attrs);
762  for (attnum = 0; attnum < num_phys_attrs; attnum++)
763  {
764  Expr *defexpr;
765 
766  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
767  continue;
768 
769  if (rel->attrmap->attnums[attnum] >= 0)
770  continue;
771 
772  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
773 
774  if (defexpr != NULL)
775  {
776  /* Run the expression through planner */
777  defexpr = expression_planner(defexpr);
778 
779  /* Initialize executable expression in copycontext */
780  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
781  defmap[num_defaults] = attnum;
782  num_defaults++;
783  }
784  }
785 
786  for (i = 0; i < num_defaults; i++)
787  slot->tts_values[defmap[i]] =
788  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
789 }
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:134
#define GetPerTupleExprContext(estate)
Definition: executor.h:550
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:333
int16 attnum
Definition: pg_attribute.h:74
Expr * expression_planner(Expr *expr)
Definition: planner.c:6457
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 898 of file worker.c.

901 {
902  int natts = slot->tts_tupleDescriptor->natts;
903  int i;
904 
905  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
906  ExecClearTuple(slot);
907 
908  /*
909  * Copy all the column data from srcslot, so that we'll have valid values
910  * for unreplaced columns.
911  */
912  Assert(natts == srcslot->tts_tupleDescriptor->natts);
913  slot_getallattrs(srcslot);
914  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
915  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
916 
917  /* Call the "in" function for each replaced attribute */
918  Assert(natts == rel->attrmap->maplen);
919  for (i = 0; i < natts; i++)
920  {
922  int remoteattnum = rel->attrmap->attnums[i];
923 
924  if (remoteattnum < 0)
925  continue;
926 
927  Assert(remoteattnum < tupleData->ncols);
928 
929  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
930  {
931  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
932 
933  /* Set attnum for error callback */
935 
936  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
937  {
938  Oid typinput;
939  Oid typioparam;
940 
941  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
942  slot->tts_values[i] =
943  OidInputFunctionCall(typinput, colvalue->data,
944  typioparam, att->atttypmod);
945  slot->tts_isnull[i] = false;
946  }
947  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
948  {
949  Oid typreceive;
950  Oid typioparam;
951 
952  /*
953  * In some code paths we may be asked to re-parse the same
954  * tuple data. Reset the StringInfo's cursor so that works.
955  */
956  colvalue->cursor = 0;
957 
958  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
959  slot->tts_values[i] =
960  OidReceiveFunctionCall(typreceive, colvalue,
961  typioparam, att->atttypmod);
962 
963  /* Trouble if it didn't eat the whole buffer */
964  if (colvalue->cursor != colvalue->len)
965  ereport(ERROR,
966  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
967  errmsg("incorrect binary data format in logical replication column %d",
968  remoteattnum + 1)));
969  slot->tts_isnull[i] = false;
970  }
971  else
972  {
973  /* must be LOGICALREP_COLUMN_NULL */
974  slot->tts_values[i] = (Datum) 0;
975  slot->tts_isnull[i] = true;
976  }
977 
978  /* Reset attnum for error callback */
980  }
981  }
982 
983  /* And finally, declare that "slot" contains a valid virtual tuple */
984  ExecStoreVirtualTuple(slot);
985 }
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2874
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2940
StringInfoData * colvalues
Definition: logicalproto.h:87

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 797 of file worker.c.

799 {
800  int natts = slot->tts_tupleDescriptor->natts;
801  int i;
802 
803  ExecClearTuple(slot);
804 
805  /* Call the "in" function for each non-dropped, non-null attribute */
806  Assert(natts == rel->attrmap->maplen);
807  for (i = 0; i < natts; i++)
808  {
810  int remoteattnum = rel->attrmap->attnums[i];
811 
812  if (!att->attisdropped && remoteattnum >= 0)
813  {
814  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
815 
816  Assert(remoteattnum < tupleData->ncols);
817 
818  /* Set attnum for error callback */
820 
821  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
822  {
823  Oid typinput;
824  Oid typioparam;
825 
826  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
827  slot->tts_values[i] =
828  OidInputFunctionCall(typinput, colvalue->data,
829  typioparam, att->atttypmod);
830  slot->tts_isnull[i] = false;
831  }
832  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
833  {
834  Oid typreceive;
835  Oid typioparam;
836 
837  /*
838  * In some code paths we may be asked to re-parse the same
839  * tuple data. Reset the StringInfo's cursor so that works.
840  */
841  colvalue->cursor = 0;
842 
843  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
844  slot->tts_values[i] =
845  OidReceiveFunctionCall(typreceive, colvalue,
846  typioparam, att->atttypmod);
847 
848  /* Trouble if it didn't eat the whole buffer */
849  if (colvalue->cursor != colvalue->len)
850  ereport(ERROR,
851  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
852  errmsg("incorrect binary data format in logical replication column %d",
853  remoteattnum + 1)));
854  slot->tts_isnull[i] = false;
855  }
856  else
857  {
858  /*
859  * NULL value from remote. (We don't expect to see
860  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
861  * NULL.)
862  */
863  slot->tts_values[i] = (Datum) 0;
864  slot->tts_isnull[i] = true;
865  }
866 
867  /* Reset attnum for error callback */
869  }
870  else
871  {
872  /*
873  * We assign NULL to dropped attributes and missing values
874  * (missing values should be later filled using
875  * slot_fill_defaults).
876  */
877  slot->tts_values[i] = (Datum) 0;
878  slot->tts_isnull[i] = true;
879  }
880  }
881 
882  ExecStoreVirtualTuple(slot);
883 }

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 4425 of file worker.c.

4426 {
4427  PG_TRY();
4428  {
4429  LogicalRepApplyLoop(origin_startpos);
4430  }
4431  PG_CATCH();
4432  {
4435  else
4436  {
4437  /*
4438  * Report the worker failed while applying changes. Abort the
4439  * current transaction so that the stats message is sent in an
4440  * idle state.
4441  */
4444 
4445  PG_RE_THROW();
4446  }
4447  }
4448  PG_END_TRY();
4449 }
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3478
void DisableSubscriptionAndExit(void)
Definition: worker.c:4705
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 4790 of file worker.c.

4791 {
4792  if (!is_skipping_changes())
4793  return;
4794 
4795  ereport(LOG,
4796  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4798 
4799  /* Stop skipping changes */
4801 }

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3436 of file worker.c.

3437 {
3438  FlushPosition *flushpos;
3439 
3440  /*
3441  * Skip for parallel apply workers, because the lsn_mapping is maintained
3442  * by the leader apply worker.
3443  */
3445  return;
3446 
3447  /* Need to do this in permanent context */
3449 
3450  /* Track commit lsn */
3451  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3452  flushpos->local_end = local_lsn;
3453  flushpos->remote_end = remote_lsn;
3454 
3455  dlist_push_tail(&lsn_mapping, &flushpos->node);
3457 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:199

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_abort_internal()

static void stream_abort_internal ( TransactionId  xid,
TransactionId  subxid 
)
static

Definition at line 1729 of file worker.c.

1730 {
1731  /*
1732  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1733  * just delete the files with serialized info.
1734  */
1735  if (xid == subxid)
1737  else
1738  {
1739  /*
1740  * OK, so it's a subxact. We need to read the subxact file for the
1741  * toplevel transaction, determine the offset tracked for the subxact,
1742  * and truncate the file with changes. We also remove the subxacts
1743  * with higher offsets (or rather higher XIDs).
1744  *
1745  * We intentionally scan the array from the tail, because we're likely
1746  * aborting a change for the most recent subtransactions.
1747  *
1748  * We can't use the binary search here as subxact XIDs won't
1749  * necessarily arrive in sorted order, consider the case where we have
1750  * released the savepoint for multiple subtransactions and then
1751  * performed rollback to savepoint for one of the earlier
1752  * sub-transaction.
1753  */
1754  int64 i;
1755  int64 subidx;
1756  BufFile *fd;
1757  bool found = false;
1758  char path[MAXPGPATH];
1759 
1760  subidx = -1;
1763 
1764  for (i = subxact_data.nsubxacts; i > 0; i--)
1765  {
1766  if (subxact_data.subxacts[i - 1].xid == subxid)
1767  {
1768  subidx = (i - 1);
1769  found = true;
1770  break;
1771  }
1772  }
1773 
1774  /*
1775  * If it's an empty sub-transaction then we will not find the subxid
1776  * here so just cleanup the subxact info and return.
1777  */
1778  if (!found)
1779  {
1780  /* Cleanup the subxact info */
1784  return;
1785  }
1786 
1787  /* open the changes file */
1790  O_RDWR, false);
1791 
1792  /* OK, truncate the file at the right offset */
1794  subxact_data.subxacts[subidx].offset);
1795  BufFileClose(fd);
1796 
1797  /* discard the subxacts added later */
1798  subxact_data.nsubxacts = subidx;
1799 
1800  /* write the updated subxact list */
1802 
1805  }
1806 }
static void cleanup_subxact_info(void)
Definition: worker.c:4388
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4006
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4055
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:933
off_t offset
Definition: worker.c:345
TransactionId xid
Definition: worker.c:343
int fileno
Definition: worker.c:344

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), fd(), SubXactInfo::fileno, i, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_handle_stream_abort().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 4205 of file worker.c.

4206 {
4207  char path[MAXPGPATH];
4208 
4209  /* Delete the changes file. */
4210  changes_filename(path, subid, xid);
4212 
4213  /* Delete the subxact file, if it exists. */
4214  subxact_filename(path, subid, xid);
4216 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4184
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_close_file()

static void stream_close_file ( void  )
static

Definition at line 4274 of file worker.c.

4275 {
4276  Assert(stream_fd != NULL);
4277 
4279 
4280  stream_fd = NULL;
4281 }

References Assert, BufFileClose(), and stream_fd.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_spooled_messages(), and stream_stop_internal().

◆ stream_open_and_write_change()

static void stream_open_and_write_change ( TransactionId  xid,
char  action,
StringInfo  s 
)
static

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 4229 of file worker.c.

4230 {
4231  char path[MAXPGPATH];
4232  MemoryContext oldcxt;
4233 
4234  Assert(OidIsValid(subid));
4236  Assert(stream_fd == NULL);
4237 
4238 
4239  changes_filename(path, subid, xid);
4240  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4241 
4242  /*
4243  * Create/open the buffiles under the logical streaming context so that we
4244  * have those files until stream stop.
4245  */
4247 
4248  /*
4249  * If this is the first streamed segment, create the changes file.
4250  * Otherwise, just open the file for writing, in append mode.
4251  */
4252  if (first_segment)
4254  path);
4255  else
4256  {
4257  /*
4258  * Open the file and seek to the end of the file because we always
4259  * append the changes file.
4260  */
4262  path, O_RDWR, false);
4263  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4264  }
4265 
4266  MemoryContextSwitchTo(oldcxt);
4267 }
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267

References Assert, BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by stream_start_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1429 of file worker.c.

1430 {
1432 
1433  /*
1434  * Initialize the worker's stream_fileset if we haven't yet. This will be
1435  * used for the entire duration of the worker so create it in a permanent
1436  * context. We create this on the very first streaming message from any
1437  * transaction and then use it for this and other streaming transactions.
1438  * Now, we could create a fileset at the start of the worker as well but
1439  * then we won't be sure that it will ever be used.
1440  */
1442  {
1443  MemoryContext oldctx;
1444 
1446 
1449 
1450  MemoryContextSwitchTo(oldctx);
1451  }
1452 
1453  /* Open the spool file for this transaction. */
1454  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1455 
1456  /* If this is not the first segment, open existing subxact file. */
1457  if (!first_segment)
1459 
1461 }
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4229
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1603 of file worker.c.

1604 {
1605  /*
1606  * Serialize information about subxacts for the toplevel transaction, then
1607  * close the stream messages spool file.
1608  */
1611 
1612  /* We must be in a valid transaction state */
1614 
1615  /* Commit the per-stream transaction */
1617 
1618  /* Reset per-stream context */
1620 }

References Assert, CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 4292 of file worker.c.

4293 {
4294  int len;
4295 
4296  Assert(stream_fd != NULL);
4297 
4298  /* total on-disk size, including the action type character */
4299  len = (s->len - s->cursor) + sizeof(char);
4300 
4301  /* first write the size */
4302  BufFileWrite(stream_fd, &len, sizeof(len));
4303 
4304  /* then the action */
4305  BufFileWrite(stream_fd, &action, sizeof(action));
4306 
4307  /* and finally the remaining part of the buffer (after the XID) */
4308  len = (s->len - s->cursor);
4309 
4310  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4311 }
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676

References generate_unaccent_rules::action, Assert, BufFileWrite(), StringInfoData::cursor, StringInfoData::data, StringInfoData::len, len, and stream_fd.

Referenced by apply_handle_stream_start(), apply_handle_stream_stop(), handle_streamed_transaction(), and stream_open_and_write_change().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 3991 of file worker.c.

3992 {
3993  MySubscriptionValid = false;
3994 }

References MySubscriptionValid.

Referenced by InitializeLogRepWorker().

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4184 of file worker.c.

4185 {
4186  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4187 }

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 4106 of file worker.c.

4107 {
4108  SubXactInfo *subxacts = subxact_data.subxacts;
4109  int64 i;
4110 
4111  /* We must have a valid top level stream xid and a stream fd. */
4113  Assert(stream_fd != NULL);
4114 
4115  /*
4116  * If the XID matches the toplevel transaction, we don't want to add it.
4117  */
4118  if (stream_xid == xid)
4119  return;
4120 
4121  /*
4122  * In most cases we're checking the same subxact as we've already seen in
4123  * the last call, so make sure to ignore it (this change comes later).
4124  */
4125  if (subxact_data.subxact_last == xid)
4126  return;
4127 
4128  /* OK, remember we're processing this XID. */
4129  subxact_data.subxact_last = xid;
4130 
4131  /*
4132  * Check if the transaction is already present in the array of subxact. We
4133  * intentionally scan the array from the tail, because we're likely adding
4134  * a change for the most recent subtransactions.
4135  *
4136  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4137  * would allow us to use binary search here.
4138  */
4139  for (i = subxact_data.nsubxacts; i > 0; i--)
4140  {
4141  /* found, so we're done */
4142  if (subxacts[i - 1].xid == xid)
4143  return;
4144  }
4145 
4146  /* This is a new subxact, so we need to add it to the array. */
4147  if (subxact_data.nsubxacts == 0)
4148  {
4149  MemoryContext oldctx;
4150 
4152 
4153  /*
4154  * Allocate this memory for subxacts in per-stream context, see
4155  * subxact_info_read.
4156  */
4158  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4159  MemoryContextSwitchTo(oldctx);
4160  }
4162  {
4164  subxacts = repalloc(subxacts,
4166  }
4167 
4168  subxacts[subxact_data.nsubxacts].xid = xid;
4169 
4170  /*
4171  * Get the current offset of the stream file and store it as offset of
4172  * this subxact.
4173  */
4175  &subxacts[subxact_data.nsubxacts].fileno,
4176  &subxacts[subxact_data.nsubxacts].offset);
4177 
4179  subxact_data.subxacts = subxacts;
4180 }

References Assert, BufFileTell(), SubXactInfo::fileno, i, LogicalStreamingContext, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc(), repalloc(), stream_fd, stream_xid, subxact_data, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 4055 of file worker.c.

4056 {
4057  char path[MAXPGPATH];
4058  Size len;
4059  BufFile *fd;
4060  MemoryContext oldctx;
4061 
4065 
4066  /*
4067  * If the subxact file doesn't exist that means we don't have any subxact
4068  * info.
4069  */
4070  subxact_filename(path, subid, xid);
4072  true);
4073  if (fd == NULL)
4074  return;
4075 
4076  /* read number of subxact items */
4078 
4079  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4080 
4081  /* we keep the maximum as a power of 2 */
4083 
4084  /*
4085  * Allocate subxact information in the logical streaming context. We need
4086  * this information during the complete stream so that we can add the sub
4087  * transaction info to this. On stream stop we will flush this information
4088  * to the subxact file and reset the logical streaming context.
4089  */
4092  sizeof(SubXactInfo));
4093  MemoryContextSwitchTo(oldctx);
4094 
4095  if (len > 0)
4097 
4098  BufFileClose(fd);
4099 }
struct SubXactInfo SubXactInfo
size_t Size
Definition: c.h:605
int my_log2(long num)
Definition: dynahash.c:1751

References Assert, BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), fd(), len, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), my_log2(), MyLogicalRepWorker, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc(), LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and stream_start_internal().

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 4006 of file worker.c.

4007 {
4008  char path[MAXPGPATH];
4009  Size len;
4010  BufFile *fd;
4011 
4013 
4014  /* construct the subxact filename */
4015  subxact_filename(path, subid, xid);
4016 
4017  /* Delete the subxacts file, if exists. */
4018  if (subxact_data.nsubxacts == 0)
4019  {
4022 
4023  return;
4024  }
4025 
4026  /*
4027  * Create the subxact file if it not already created, otherwise open the
4028  * existing file.
4029  */
4031  true);
4032  if (fd == NULL)
4034 
4035  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4036 
4037  /* Write the subxact count and subxact info */
4040 
4041  BufFileClose(fd);
4042 
4043  /* free the memory allocated for subxact info */
4045 }

References Assert, BufFileClose(), BufFileCreateFileSet(), BufFileDeleteFileSet(), BufFileOpenFileSet(), BufFileWrite(), cleanup_subxact_info(), fd(), len, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by stream_abort_internal(), and stream_stop_internal().

◆ TargetPrivilegesCheck()

static void TargetPrivilegesCheck ( Relation  rel,
AclMode  mode 
)
static

Definition at line 2339 of file worker.c.

2340 {
2341  Oid relid;
2342  AclResult aclresult;
2343 
2344  relid = RelationGetRelid(rel);
2345  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2346  if (aclresult != ACLCHECK_OK)
2347  aclcheck_error(aclresult,
2348  get_relkind_objtype(rel->rd_rel->relkind),
2349  get_rel_name(relid));
2350 
2351  /*
2352  * We lack the infrastructure to honor RLS policies. It might be possible
2353  * to add such infrastructure here, but tablesync workers lack it, too, so
2354  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2355  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2356  * replicate subsequent INSERTs, so we forbid all commands the same.
2357  */
2358  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2359  ereport(ERROR,
2360  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2361  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2362  GetUserNameFromId(GetUserId(), true),
2363  RelationGetRelationName(rel))));
2364 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2688
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4079
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:980
Oid GetUserId(void)
Definition: miscinit.c:514
ObjectType get_relkind_objtype(char relkind)
static PgChecksumMode mode
Definition: pg_checksums.c:56
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45

References aclcheck_error(), ACLCHECK_OK, check_enable_rls(), ereport, errcode(), errmsg(), ERROR, get_rel_name(), get_relkind_objtype(), GetUserId(), GetUserNameFromId(), InvalidOid, mode, pg_class_aclcheck(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, and RLS_ENABLED.

Referenced by apply_handle_delete_internal(), apply_handle_insert_internal(), apply_handle_truncate(), apply_handle_tuple_routing(), apply_handle_update_internal(), and FindReplTupleInLocalRel().

◆ TwoPhaseTransactionGid()

static void TwoPhaseTransactionGid ( Oid  subid,
TransactionId  xid,
char *  gid,
int  szgid 
)
static

Definition at line 4405 of file worker.c.

4406 {
4407  Assert(subid != InvalidRepOriginId);
4408 
4409  if (!TransactionIdIsValid(xid))
4410  ereport(ERROR,
4411  (errcode(ERRCODE_PROTOCOL_VIOLATION),
4412  errmsg_internal("invalid two-phase transaction ID")));
4413 
4414  snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
4415 }
#define InvalidRepOriginId
Definition: origin.h:33

References Assert, ereport, errcode(), errmsg_internal(), ERROR, InvalidRepOriginId, snprintf, and TransactionIdIsValid.

Referenced by apply_handle_commit_prepared(), apply_handle_prepare_internal(), and apply_handle_rollback_prepared().

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Variable Documentation

◆ apply_error_callback_arg

ApplyErrorCallbackArg apply_error_callback_arg
Initial value:
=
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}

Definition at line 278 of file worker.c.

Referenced by apply_dispatch(), apply_error_callback(), apply_handle_delete(), apply_handle_insert(), apply_handle_update(), reset_apply_error_context_info(), set_apply_error_context_origin(), set_apply_error_context_xact(), slot_modify_data(), and slot_store_data().

◆ apply_error_context_stack

ErrorContextCallback* apply_error_context_stack = NULL

Definition at line 288 of file worker.c.

Referenced by HandleParallelApplyMessage(), and LogicalRepApplyLoop().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ in_streamed_transaction

◆ InitializingApplyWorker

bool InitializingApplyWorker = false

Definition at line 318 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogicalStreamingContext

MemoryContext LogicalStreamingContext = NULL
static

◆ LogRepWorkerWalRcvConn

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 204 of file worker.c.

Referenced by get_flush_position(), LogicalRepApplyLoop(), and store_flush_position().

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false
static

◆ on_commit_wakeup_workers_subids

List* on_commit_wakeup_workers_subids = NIL
static

Definition at line 301 of file worker.c.

Referenced by AtEOXact_LogicalRepWorkers(), and LogicalRepWorkersWakeupAtCommit().

◆ parallel_stream_nchanges

uint32 parallel_stream_nchanges = 0
static

◆ remote_final_lsn

◆ skip_xact_finish_lsn

XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
static

Definition at line 335 of file worker.c.

Referenced by maybe_start_skipping_changes(), and stop_skipping_changes().

◆ stream_fd

◆ stream_xid

◆ subxact_data