PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void apply_worker_exit (void)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker ()
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 336 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 195 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 265 of file worker.c.

266 {
267  /* The action for non-streaming transactions. */
269 
270  /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:266
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:271
@ TRANS_PARALLEL_APPLY
Definition: worker.c:274
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:272
@ TRANS_LEADER_APPLY
Definition: worker.c:268
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:273

Function Documentation

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3269 of file worker.c.

3270 {
3272  LogicalRepMsgType saved_command;
3273 
3274  /*
3275  * Set the current command being applied. Since this function can be
3276  * called recursively when applying spooled changes, save the current
3277  * command.
3278  */
3279  saved_command = apply_error_callback_arg.command;
3281 
3282  switch (action)
3283  {
3284  case LOGICAL_REP_MSG_BEGIN:
3285  apply_handle_begin(s);
3286  break;
3287 
3290  break;
3291 
3294  break;
3295 
3298  break;
3299 
3302  break;
3303 
3306  break;
3307 
3310  break;
3311 
3312  case LOGICAL_REP_MSG_TYPE:
3313  apply_handle_type(s);
3314  break;
3315 
3318  break;
3319 
3321 
3322  /*
3323  * Logical replication does not use generic logical messages yet.
3324  * Although, it could be used by other applications that use this
3325  * output plugin.
3326  */
3327  break;
3328 
3331  break;
3332 
3335  break;
3336 
3339  break;
3340 
3343  break;
3344 
3347  break;
3348 
3351  break;
3352 
3355  break;
3356 
3359  break;
3360 
3363  break;
3364 
3365  default:
3366  ereport(ERROR,
3367  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3368  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3369  }
3370 
3371  /* Reset the current command */
3372  apply_error_callback_arg.command = saved_command;
3373 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1268
static void apply_handle_type(StringInfo s)
Definition: worker.c:2321
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3141
static void apply_handle_update(StringInfo s)
Definition: worker.c:2517
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2128
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1166
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:278
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2701
static void apply_handle_begin(StringInfo s)
Definition: worker.c:988
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1013
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1809
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2298
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1105
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1215
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1623
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1405
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1039
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1464
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2368
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:221

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4873 of file worker.c.

4874 {
4876 
4878  return;
4879 
4880  Assert(errarg->origin_name);
4881 
4882  if (errarg->rel == NULL)
4883  {
4884  if (!TransactionIdIsValid(errarg->remote_xid))
4885  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4886  errarg->origin_name,
4887  logicalrep_message_type(errarg->command));
4888  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4889  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4890  errarg->origin_name,
4892  errarg->remote_xid);
4893  else
4894  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4895  errarg->origin_name,
4897  errarg->remote_xid,
4898  LSN_FORMAT_ARGS(errarg->finish_lsn));
4899  }
4900  else
4901  {
4902  if (errarg->remote_attnum < 0)
4903  {
4904  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4905  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4906  errarg->origin_name,
4908  errarg->rel->remoterel.nspname,
4909  errarg->rel->remoterel.relname,
4910  errarg->remote_xid);
4911  else
4912  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4913  errarg->origin_name,
4915  errarg->rel->remoterel.nspname,
4916  errarg->rel->remoterel.relname,
4917  errarg->remote_xid,
4918  LSN_FORMAT_ARGS(errarg->finish_lsn));
4919  }
4920  else
4921  {
4922  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4923  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4924  errarg->origin_name,
4926  errarg->rel->remoterel.nspname,
4927  errarg->rel->remoterel.relname,
4928  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4929  errarg->remote_xid);
4930  else
4931  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4932  errarg->origin_name,
4934  errarg->rel->remoterel.nspname,
4935  errarg->rel->remoterel.relname,
4936  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4937  errarg->remote_xid,
4938  LSN_FORMAT_ARGS(errarg->finish_lsn));
4939  }
4940  }
4941 }
#define Assert(condition)
Definition: c.h:858
#define errcontext
Definition: elog.h:196
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:226
XLogRecPtr finish_lsn
Definition: worker.c:227
LogicalRepRelMapEntry * rel
Definition: worker.c:222
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert, LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 988 of file worker.c.

989 {
990  LogicalRepBeginData begin_data;
991 
992  /* There must not be an active streaming transaction. */
994 
995  logicalrep_read_begin(s, &begin_data);
996  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
997 
998  remote_final_lsn = begin_data.final_lsn;
999 
1001 
1002  in_remote_transaction = true;
1003 
1005 }
bool in_remote_transaction
Definition: worker.c:303
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4945
static XLogRecPtr remote_final_lsn
Definition: worker.c:304
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4743
static TransactionId stream_xid
Definition: worker.c:309
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert, LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1039 of file worker.c.

1040 {
1041  LogicalRepPreparedTxnData begin_data;
1042 
1043  /* Tablesync should never receive prepare. */
1044  if (am_tablesync_worker())
1045  ereport(ERROR,
1046  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1047  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1048 
1049  /* There must not be an active streaming transaction. */
1051 
1052  logicalrep_read_begin_prepare(s, &begin_data);
1053  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1054 
1055  remote_final_lsn = begin_data.prepare_lsn;
1056 
1058 
1059  in_remote_transaction = true;
1060 
1062 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert, ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1013 of file worker.c.

1014 {
1015  LogicalRepCommitData commit_data;
1016 
1017  logicalrep_read_commit(s, &commit_data);
1018 
1019  if (commit_data.commit_lsn != remote_final_lsn)
1020  ereport(ERROR,
1021  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1022  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1023  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1025 
1026  apply_handle_commit_internal(&commit_data);
1027 
1028  /* Process any tables that are being synchronized in parallel. */
1029  process_syncing_tables(commit_data.end_lsn);
1030 
1033 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2238
static void reset_apply_error_context_info(void)
Definition: worker.c:4953
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2238 of file worker.c.

2239 {
2240  if (is_skipping_changes())
2241  {
2243 
2244  /*
2245  * Start a new transaction to clear the subskiplsn, if not started
2246  * yet.
2247  */
2248  if (!IsTransactionState())
2250  }
2251 
2252  if (IsTransactionState())
2253  {
2254  /*
2255  * The transaction is either non-empty or skipped, so we clear the
2256  * subskiplsn.
2257  */
2259 
2260  /*
2261  * Update origin state so we can restart streaming from correct
2262  * position in case of crash.
2263  */
2264  replorigin_session_origin_lsn = commit_data->end_lsn;
2266 
2268 
2269  if (IsTransactionBlock())
2270  {
2271  EndTransactionBlock(false);
2273  }
2274 
2275  pgstat_report_stat(false);
2276 
2278  }
2279  else
2280  {
2281  /* Process any invalidation messages that might have accumulated. */
2284  }
2285 
2286  in_remote_transaction = false;
2287 }
static void stop_skipping_changes(void)
Definition: worker.c:4770
#define is_skipping_changes()
Definition: worker.c:336
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4792
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3433
void maybe_reread_subscription(void)
Definition: worker.c:3859
void AcceptInvalidationMessages(void)
Definition: inval.c:806
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
long pgstat_report_stat(bool force)
Definition: pgstat.c:622
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:385
void StartTransactionCommand(void)
Definition: xact.c:3033
bool IsTransactionBlock(void)
Definition: xact.c:4958
void CommitTransactionCommand(void)
Definition: xact.c:3131
bool EndTransactionBlock(bool chain)
Definition: xact.c:4031
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:253

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1166 of file worker.c.

1167 {
1168  LogicalRepCommitPreparedTxnData prepare_data;
1169  char gid[GIDSIZE];
1170 
1171  logicalrep_read_commit_prepared(s, &prepare_data);
1172  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1173 
1174  /* Compute GID for two_phase transactions. */
1176  gid, sizeof(gid));
1177 
1178  /* There is no transaction when COMMIT PREPARED is called */
1180 
1181  /*
1182  * Update origin state so we can restart streaming from correct position
1183  * in case of crash.
1184  */
1185  replorigin_session_origin_lsn = prepare_data.end_lsn;
1187 
1188  FinishPreparedTransaction(gid, true);
1191  pgstat_report_stat(false);
1192 
1194  in_remote_transaction = false;
1195 
1196  /* Process any tables that are being synchronized in parallel. */
1197  process_syncing_tables(prepare_data.end_lsn);
1198 
1199  clear_subscription_skip_lsn(prepare_data.end_lsn);
1200 
1203 }
static void begin_replication_step(void)
Definition: worker.c:505
static void end_replication_step(void)
Definition: worker.c:528
Subscription * MySubscription
Definition: worker.c:298
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2692
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1503
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2701 of file worker.c.

2702 {
2703  LogicalRepRelMapEntry *rel;
2704  LogicalRepTupleData oldtup;
2705  LogicalRepRelId relid;
2706  UserContext ucxt;
2707  ApplyExecutionData *edata;
2708  EState *estate;
2709  TupleTableSlot *remoteslot;
2710  MemoryContext oldctx;
2711  bool run_as_owner;
2712 
2713  /*
2714  * Quick return if we are skipping data modification changes or handling
2715  * streamed transactions.
2716  */
2717  if (is_skipping_changes() ||
2719  return;
2720 
2722 
2723  relid = logicalrep_read_delete(s, &oldtup);
2724  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2725  if (!should_apply_changes_for_rel(rel))
2726  {
2727  /*
2728  * The relation can't become interesting in the middle of the
2729  * transaction so it's safe to unlock it.
2730  */
2733  return;
2734  }
2735 
2736  /* Set relation for error callback */
2738 
2739  /* Check if we can do the delete. */
2741 
2742  /*
2743  * Make sure that any user-supplied code runs as the table owner, unless
2744  * the user has opted out of that behavior.
2745  */
2746  run_as_owner = MySubscription->runasowner;
2747  if (!run_as_owner)
2748  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2749 
2750  /* Initialize the executor state. */
2751  edata = create_edata_for_relation(rel);
2752  estate = edata->estate;
2753  remoteslot = ExecInitExtraTupleSlot(estate,
2754  RelationGetDescr(rel->localrel),
2755  &TTSOpsVirtual);
2756 
2757  /* Build the search tuple. */
2759  slot_store_data(remoteslot, rel, &oldtup);
2760  MemoryContextSwitchTo(oldctx);
2761 
2762  /* For a partitioned table, apply delete to correct partition. */
2763  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2765  remoteslot, NULL, CMD_DELETE);
2766  else
2768  remoteslot, rel->localindexoid);
2769 
2770  finish_edata(edata);
2771 
2772  /* Reset relation for error callback */
2774 
2775  if (!run_as_owner)
2776  RestoreUserContext(&ucxt);
2777 
2779 
2781 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2476
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:649
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:465
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:556
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2890
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:794
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:706
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2789
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:555
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:268
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
Definition: rel.h:531
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:473
ResultRelInfo * targetRelInfo
Definition: worker.c:211
EState * estate
Definition: worker.c:208
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 2789 of file worker.c.

2793 {
2794  EState *estate = edata->estate;
2795  Relation localrel = relinfo->ri_RelationDesc;
2796  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2797  EPQState epqstate;
2798  TupleTableSlot *localslot;
2799  bool found;
2800 
2801  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2802  ExecOpenIndices(relinfo, false);
2803 
2804  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2805  remoteslot, &localslot);
2806 
2807  /* If found delete it. */
2808  if (found)
2809  {
2810  EvalPlanQualSetSlot(&epqstate, localslot);
2811 
2812  /* Do the actual delete. */
2814  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2815  }
2816  else
2817  {
2818  /*
2819  * The tuple to be deleted could not be found. Do nothing except for
2820  * emitting a log message.
2821  *
2822  * XXX should this be promoted to ereport(LOG) perhaps?
2823  */
2824  elog(DEBUG1,
2825  "logical replication did not find row to be deleted "
2826  "in replication target relation \"%s\"",
2827  RelationGetRelationName(localrel));
2828  }
2829 
2830  /* Cleanup. */
2831  ExecCloseIndices(relinfo);
2832  EvalPlanQualEnd(&epqstate);
2833 }
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2843
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2336
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:224
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2539
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2982
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:244
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define RelationGetRelationName(relation)
Definition: rel.h:539
LogicalRepRelMapEntry * targetRel
Definition: worker.c:210
Relation ri_RelationDesc
Definition: execnodes.h:456

References ACL_DELETE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2368 of file worker.c.

2369 {
2370  LogicalRepRelMapEntry *rel;
2371  LogicalRepTupleData newtup;
2372  LogicalRepRelId relid;
2373  UserContext ucxt;
2374  ApplyExecutionData *edata;
2375  EState *estate;
2376  TupleTableSlot *remoteslot;
2377  MemoryContext oldctx;
2378  bool run_as_owner;
2379 
2380  /*
2381  * Quick return if we are skipping data modification changes or handling
2382  * streamed transactions.
2383  */
2384  if (is_skipping_changes() ||
2386  return;
2387 
2389 
2390  relid = logicalrep_read_insert(s, &newtup);
2391  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2392  if (!should_apply_changes_for_rel(rel))
2393  {
2394  /*
2395  * The relation can't become interesting in the middle of the
2396  * transaction so it's safe to unlock it.
2397  */
2400  return;
2401  }
2402 
2403  /*
2404  * Make sure that any user-supplied code runs as the table owner, unless
2405  * the user has opted out of that behavior.
2406  */
2407  run_as_owner = MySubscription->runasowner;
2408  if (!run_as_owner)
2409  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2410 
2411  /* Set relation for error callback */
2413 
2414  /* Initialize the executor state. */
2415  edata = create_edata_for_relation(rel);
2416  estate = edata->estate;
2417  remoteslot = ExecInitExtraTupleSlot(estate,
2418  RelationGetDescr(rel->localrel),
2419  &TTSOpsVirtual);
2420 
2421  /* Process and store remote tuple in the slot */
2423  slot_store_data(remoteslot, rel, &newtup);
2424  slot_fill_defaults(rel, estate, remoteslot);
2425  MemoryContextSwitchTo(oldctx);
2426 
2427  /* For a partitioned table, insert the tuple into a partition. */
2428  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2430  remoteslot, NULL, CMD_INSERT);
2431  else
2433  remoteslot);
2434 
2435  finish_edata(edata);
2436 
2437  /* Reset relation for error callback */
2439 
2440  if (!run_as_owner)
2441  RestoreUserContext(&ucxt);
2442 
2444 
2446 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2454
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:737
@ CMD_INSERT
Definition: nodes.h:267
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2454 of file worker.c.

2457 {
2458  EState *estate = edata->estate;
2459 
2460  /* We must open indexes here. */
2461  ExecOpenIndices(relinfo, false);
2462 
2463  /* Do the insert. */
2465  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2466 
2467  /* Cleanup. */
2468  ExecCloseIndices(relinfo);
2469 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1405 of file worker.c.

1406 {
1407  /*
1408  * ORIGIN message can only come inside streaming transaction or inside
1409  * remote transaction and before any actual writes.
1410  */
1411  if (!in_streamed_transaction &&
1414  ereport(ERROR,
1415  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1416  errmsg_internal("ORIGIN message sent out of order")));
1417 }
static bool in_streamed_transaction
Definition: worker.c:307

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1105 of file worker.c.

1106 {
1107  LogicalRepPreparedTxnData prepare_data;
1108 
1109  logicalrep_read_prepare(s, &prepare_data);
1110 
1111  if (prepare_data.prepare_lsn != remote_final_lsn)
1112  ereport(ERROR,
1113  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1114  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1115  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1117 
1118  /*
1119  * Unlike commit, here, we always prepare the transaction even though no
1120  * change has happened in this transaction or all changes are skipped. It
1121  * is done this way because at commit prepared time, we won't know whether
1122  * we have skipped preparing a transaction because of those reasons.
1123  *
1124  * XXX, We can optimize such that at commit prepared time, we first check
1125  * whether we have prepared the transaction or not but that doesn't seem
1126  * worthwhile because such cases shouldn't be common.
1127  */
1129 
1130  apply_handle_prepare_internal(&prepare_data);
1131 
1134  pgstat_report_stat(false);
1135 
1137 
1138  in_remote_transaction = false;
1139 
1140  /* Process any tables that are being synchronized in parallel. */
1141  process_syncing_tables(prepare_data.end_lsn);
1142 
1143  /*
1144  * Since we have already prepared the transaction, in a case where the
1145  * server crashes before clearing the subskiplsn, it will be left but the
1146  * transaction won't be resent. But that's okay because it's a rare case
1147  * and the subskiplsn will be cleared when finishing the next transaction.
1148  */
1151 
1154 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1068
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1068 of file worker.c.

1069 {
1070  char gid[GIDSIZE];
1071 
1072  /*
1073  * Compute unique GID for two_phase transactions. We don't use GID of
1074  * prepared transaction sent by server as that can lead to deadlock when
1075  * we have multiple subscriptions from same node point to publications on
1076  * the same node. See comments atop worker.c
1077  */
1078  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1079  gid, sizeof(gid));
1080 
1081  /*
1082  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1083  * called within the PrepareTransactionBlock below.
1084  */
1085  if (!IsTransactionBlock())
1086  {
1088  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1089  }
1090 
1091  /*
1092  * Update origin state so we can restart streaming from correct position
1093  * in case of crash.
1094  */
1095  replorigin_session_origin_lsn = prepare_data->end_lsn;
1097 
1099 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3979
void BeginTransactionBlock(void)
Definition: xact.c:3911

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2298 of file worker.c.

2299 {
2300  LogicalRepRelation *rel;
2301 
2303  return;
2304 
2305  rel = logicalrep_read_rel(s);
2307 
2308  /* Also reset all entries in the partition map that refer to remoterel. */
2310 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:540
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1215 of file worker.c.

1216 {
1217  LogicalRepRollbackPreparedTxnData rollback_data;
1218  char gid[GIDSIZE];
1219 
1220  logicalrep_read_rollback_prepared(s, &rollback_data);
1221  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1222 
1223  /* Compute GID for two_phase transactions. */
1224  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1225  gid, sizeof(gid));
1226 
1227  /*
1228  * It is possible that we haven't received prepare because it occurred
1229  * before walsender reached a consistent point or the two_phase was still
1230  * not enabled by that time, so in such cases, we need to skip rollback
1231  * prepared.
1232  */
1233  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1234  rollback_data.prepare_time))
1235  {
1236  /*
1237  * Update origin state so we can restart streaming from correct
1238  * position in case of crash.
1239  */
1242 
1243  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1245  FinishPreparedTransaction(gid, false);
1248 
1250  }
1251 
1252  pgstat_report_stat(false);
1253 
1255  in_remote_transaction = false;
1256 
1257  /* Process any tables that are being synchronized in parallel. */
1259 
1262 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2633

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1809 of file worker.c.

1810 {
1811  TransactionId xid;
1812  TransactionId subxid;
1813  LogicalRepStreamAbortData abort_data;
1814  ParallelApplyWorkerInfo *winfo;
1815  TransApplyAction apply_action;
1816 
1817  /* Save the message before it is consumed. */
1818  StringInfoData original_msg = *s;
1819  bool toplevel_xact;
1820 
1822  ereport(ERROR,
1823  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1824  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1825 
1826  /* We receive abort information only when we can apply in parallel. */
1827  logicalrep_read_stream_abort(s, &abort_data,
1829 
1830  xid = abort_data.xid;
1831  subxid = abort_data.subxid;
1832  toplevel_xact = (xid == subxid);
1833 
1834  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1835 
1836  apply_action = get_transaction_apply_action(xid, &winfo);
1837 
1838  switch (apply_action)
1839  {
1840  case TRANS_LEADER_APPLY:
1841 
1842  /*
1843  * We are in the leader apply worker and the transaction has been
1844  * serialized to file.
1845  */
1846  stream_abort_internal(xid, subxid);
1847 
1848  elog(DEBUG1, "finished processing the STREAM ABORT command");
1849  break;
1850 
1852  Assert(winfo);
1853 
1854  /*
1855  * For the case of aborting the subtransaction, we increment the
1856  * number of streaming blocks and take the lock again before
1857  * sending the STREAM_ABORT to ensure that the parallel apply
1858  * worker will wait on the lock for the next set of changes after
1859  * processing the STREAM_ABORT message if it is not already
1860  * waiting for STREAM_STOP message.
1861  *
1862  * It is important to perform this locking before sending the
1863  * STREAM_ABORT message so that the leader can hold the lock first
1864  * and the parallel apply worker will wait for the leader to
1865  * release the lock. This is the same as what we do in
1866  * apply_handle_stream_stop. See Locking Considerations atop
1867  * applyparallelworker.c.
1868  */
1869  if (!toplevel_xact)
1870  {
1874  }
1875 
1876  if (pa_send_data(winfo, s->len, s->data))
1877  {
1878  /*
1879  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1880  * wait here for the parallel apply worker to finish as that
1881  * is not required to maintain the commit order and won't have
1882  * the risk of failures due to transaction dependencies and
1883  * deadlocks. However, it is possible that before the parallel
1884  * worker finishes and we clear the worker info, the xid
1885  * wraparound happens on the upstream and a new transaction
1886  * with the same xid can appear and that can lead to duplicate
1887  * entries in ParallelApplyTxnHash. Yet another problem could
1888  * be that we may have serialized the changes in partial
1889  * serialize mode and the file containing xact changes may
1890  * already exist, and after xid wraparound trying to create
1891  * the file for the same xid can lead to an error. To avoid
1892  * these problems, we decide to wait for the aborts to finish.
1893  *
1894  * Note, it is okay to not update the flush location position
1895  * for aborts as in worst case that means such a transaction
1896  * won't be sent again after restart.
1897  */
1898  if (toplevel_xact)
1900 
1901  break;
1902  }
1903 
1904  /*
1905  * Switch to serialize mode when we are not able to send the
1906  * change to parallel apply worker.
1907  */
1908  pa_switch_to_partial_serialize(winfo, true);
1909 
1910  /* fall through */
1912  Assert(winfo);
1913 
1914  /*
1915  * Parallel apply worker might have applied some changes, so write
1916  * the STREAM_ABORT message so that it can rollback the
1917  * subtransaction if needed.
1918  */
1920  &original_msg);
1921 
1922  if (toplevel_xact)
1923  {
1926  }
1927  break;
1928 
1929  case TRANS_PARALLEL_APPLY:
1930 
1931  /*
1932  * If the parallel apply worker is applying spooled messages then
1933  * close the file before aborting.
1934  */
1935  if (toplevel_xact && stream_fd)
1937 
1938  pa_stream_abort(&abort_data);
1939 
1940  /*
1941  * We need to wait after processing rollback to savepoint for the
1942  * next set of changes.
1943  *
1944  * We have a race condition here due to which we can start waiting
1945  * here when there are more chunk of streams in the queue. See
1946  * apply_handle_stream_stop.
1947  */
1948  if (!toplevel_xact)
1950 
1951  elog(DEBUG1, "finished processing the STREAM ABORT command");
1952  break;
1953 
1954  default:
1955  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1956  break;
1957  }
1958 
1960 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:417
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5030
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4319
static BufFile * stream_fd
Definition: worker.c:339
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1726
static void stream_close_file(void)
Definition: worker.c:4271
uint32 TransactionId
Definition: c.h:652
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2128 of file worker.c.

2129 {
2130  TransactionId xid;
2131  LogicalRepCommitData commit_data;
2132  ParallelApplyWorkerInfo *winfo;
2133  TransApplyAction apply_action;
2134 
2135  /* Save the message before it is consumed. */
2136  StringInfoData original_msg = *s;
2137 
2139  ereport(ERROR,
2140  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2141  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2142 
2143  xid = logicalrep_read_stream_commit(s, &commit_data);
2144  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2145 
2146  apply_action = get_transaction_apply_action(xid, &winfo);
2147 
2148  switch (apply_action)
2149  {
2150  case TRANS_LEADER_APPLY:
2151 
2152  /*
2153  * The transaction has been serialized to file, so replay all the
2154  * spooled operations.
2155  */
2157  commit_data.commit_lsn);
2158 
2159  apply_handle_commit_internal(&commit_data);
2160 
2161  /* Unlink the files with serialized changes and subxact info. */
2163 
2164  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2165  break;
2166 
2168  Assert(winfo);
2169 
2170  if (pa_send_data(winfo, s->len, s->data))
2171  {
2172  /* Finish processing the streaming transaction. */
2173  pa_xact_finish(winfo, commit_data.end_lsn);
2174  break;
2175  }
2176 
2177  /*
2178  * Switch to serialize mode when we are not able to send the
2179  * change to parallel apply worker.
2180  */
2181  pa_switch_to_partial_serialize(winfo, true);
2182 
2183  /* fall through */
2185  Assert(winfo);
2186 
2188  &original_msg);
2189 
2191 
2192  /* Finish processing the streaming transaction. */
2193  pa_xact_finish(winfo, commit_data.end_lsn);
2194  break;
2195 
2196  case TRANS_PARALLEL_APPLY:
2197 
2198  /*
2199  * If the parallel apply worker is applying spooled messages then
2200  * close the file before committing.
2201  */
2202  if (stream_fd)
2204 
2205  apply_handle_commit_internal(&commit_data);
2206 
2208 
2209  /*
2210  * It is important to set the transaction state as finished before
2211  * releasing the lock. See pa_wait_for_xact_finish.
2212  */
2215 
2217 
2218  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2219  break;
2220 
2221  default:
2222  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2223  break;
2224  }
2225 
2226  /* Process any tables that are being synchronized in parallel. */
2227  process_syncing_tables(commit_data.end_lsn);
2228 
2230 
2232 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4202
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1998
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert, LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1268 of file worker.c.

1269 {
1270  LogicalRepPreparedTxnData prepare_data;
1271  ParallelApplyWorkerInfo *winfo;
1272  TransApplyAction apply_action;
1273 
1274  /* Save the message before it is consumed. */
1275  StringInfoData original_msg = *s;
1276 
1278  ereport(ERROR,
1279  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1280  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1281 
1282  /* Tablesync should never receive prepare. */
1283  if (am_tablesync_worker())
1284  ereport(ERROR,
1285  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1286  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1287 
1288  logicalrep_read_stream_prepare(s, &prepare_data);
1289  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1290 
1291  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1292 
1293  switch (apply_action)
1294  {
1295  case TRANS_LEADER_APPLY:
1296 
1297  /*
1298  * The transaction has been serialized to file, so replay all the
1299  * spooled operations.
1300  */
1302  prepare_data.xid, prepare_data.prepare_lsn);
1303 
1304  /* Mark the transaction as prepared. */
1305  apply_handle_prepare_internal(&prepare_data);
1306 
1308 
1310 
1311  in_remote_transaction = false;
1312 
1313  /* Unlink the files with serialized changes and subxact info. */
1315 
1316  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1317  break;
1318 
1320  Assert(winfo);
1321 
1322  if (pa_send_data(winfo, s->len, s->data))
1323  {
1324  /* Finish processing the streaming transaction. */
1325  pa_xact_finish(winfo, prepare_data.end_lsn);
1326  break;
1327  }
1328 
1329  /*
1330  * Switch to serialize mode when we are not able to send the
1331  * change to parallel apply worker.
1332  */
1333  pa_switch_to_partial_serialize(winfo, true);
1334 
1335  /* fall through */
1337  Assert(winfo);
1338 
1339  stream_open_and_write_change(prepare_data.xid,
1341  &original_msg);
1342 
1344 
1345  /* Finish processing the streaming transaction. */
1346  pa_xact_finish(winfo, prepare_data.end_lsn);
1347  break;
1348 
1349  case TRANS_PARALLEL_APPLY:
1350 
1351  /*
1352  * If the parallel apply worker is applying spooled messages then
1353  * close the file before preparing.
1354  */
1355  if (stream_fd)
1357 
1359 
1360  /* Mark the transaction as prepared. */
1361  apply_handle_prepare_internal(&prepare_data);
1362 
1364 
1366 
1368 
1371 
1373 
1374  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1375  break;
1376 
1377  default:
1378  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1379  break;
1380  }
1381 
1382  pgstat_report_stat(false);
1383 
1384  /* Process any tables that are being synchronized in parallel. */
1385  process_syncing_tables(prepare_data.end_lsn);
1386 
1387  /*
1388  * Similar to prepare case, the subskiplsn could be left in a case of
1389  * server crash but it's okay. See the comments in apply_handle_prepare().
1390  */
1393 
1395 
1397 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert, begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, XactLastCommitEnd, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1464 of file worker.c.

1465 {
1466  bool first_segment;
1467  ParallelApplyWorkerInfo *winfo;
1468  TransApplyAction apply_action;
1469 
1470  /* Save the message before it is consumed. */
1471  StringInfoData original_msg = *s;
1472 
1474  ereport(ERROR,
1475  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1476  errmsg_internal("duplicate STREAM START message")));
1477 
1478  /* There must not be an active streaming transaction. */
1480 
1481  /* notify handle methods we're processing a remote transaction */
1482  in_streamed_transaction = true;
1483 
1484  /* extract XID of the top-level transaction */
1485  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1486 
1488  ereport(ERROR,
1489  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1490  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1491 
1493 
1494  /* Try to allocate a worker for the streaming transaction. */
1495  if (first_segment)
1497 
1498  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1499 
1500  switch (apply_action)
1501  {
1503 
1504  /*
1505  * Function stream_start_internal starts a transaction. This
1506  * transaction will be committed on the stream stop unless it is a
1507  * tablesync worker in which case it will be committed after
1508  * processing all the messages. We need this transaction for
1509  * handling the BufFile, used for serializing the streaming data
1510  * and subxact info.
1511  */
1512  stream_start_internal(stream_xid, first_segment);
1513  break;
1514 
1516  Assert(winfo);
1517 
1518  /*
1519  * Once we start serializing the changes, the parallel apply
1520  * worker will wait for the leader to release the stream lock
1521  * until the end of the transaction. So, we don't need to release
1522  * the lock or increment the stream count in that case.
1523  */
1524  if (pa_send_data(winfo, s->len, s->data))
1525  {
1526  /*
1527  * Unlock the shared object lock so that the parallel apply
1528  * worker can continue to receive changes.
1529  */
1530  if (!first_segment)
1532 
1533  /*
1534  * Increment the number of streaming blocks waiting to be
1535  * processed by parallel apply worker.
1536  */
1538 
1539  /* Cache the parallel apply worker for this transaction. */
1541  break;
1542  }
1543 
1544  /*
1545  * Switch to serialize mode when we are not able to send the
1546  * change to parallel apply worker.
1547  */
1548  pa_switch_to_partial_serialize(winfo, !first_segment);
1549 
1550  /* fall through */
1552  Assert(winfo);
1553 
1554  /*
1555  * Open the spool file unless it was already opened when switching
1556  * to serialize mode. The transaction started in
1557  * stream_start_internal will be committed on the stream stop.
1558  */
1559  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1560  stream_start_internal(stream_xid, first_segment);
1561 
1563 
1564  /* Cache the parallel apply worker for this transaction. */
1566  break;
1567 
1568  case TRANS_PARALLEL_APPLY:
1569  if (first_segment)
1570  {
1571  /* Hold the lock until the end of the transaction. */
1574 
1575  /*
1576  * Signal the leader apply worker, as it may be waiting for
1577  * us.
1578  */
1580  }
1581 
1583  break;
1584 
1585  default:
1586  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1587  break;
1588  }
1589 
1591 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:315
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4289
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1426
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1623 of file worker.c.

1624 {
1625  ParallelApplyWorkerInfo *winfo;
1626  TransApplyAction apply_action;
1627 
1629  ereport(ERROR,
1630  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1631  errmsg_internal("STREAM STOP message without STREAM START")));
1632 
1633  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1634 
1635  switch (apply_action)
1636  {
1639  break;
1640 
1642  Assert(winfo);
1643 
1644  /*
1645  * Lock before sending the STREAM_STOP message so that the leader
1646  * can hold the lock first and the parallel apply worker will wait
1647  * for leader to release the lock. See Locking Considerations atop
1648  * applyparallelworker.c.
1649  */
1651 
1652  if (pa_send_data(winfo, s->len, s->data))
1653  {
1655  break;
1656  }
1657 
1658  /*
1659  * Switch to serialize mode when we are not able to send the
1660  * change to parallel apply worker.
1661  */
1662  pa_switch_to_partial_serialize(winfo, true);
1663 
1664  /* fall through */
1669  break;
1670 
1671  case TRANS_PARALLEL_APPLY:
1672  elog(DEBUG1, "applied %u changes in the streaming chunk",
1674 
1675  /*
1676  * By the time parallel apply worker is processing the changes in
1677  * the current streaming block, the leader apply worker may have
1678  * sent multiple streaming blocks. This can lead to parallel apply
1679  * worker start waiting even when there are more chunk of streams
1680  * in the queue. So, try to lock only if there is no message left
1681  * in the queue. See Locking Considerations atop
1682  * applyparallelworker.c.
1683  *
1684  * Note that here we have a race condition where we can start
1685  * waiting even when there are pending streaming chunks. This can
1686  * happen if the leader sends another streaming block and acquires
1687  * the stream lock again after the parallel apply worker checks
1688  * that there is no pending streaming block and before it actually
1689  * starts waiting on a lock. We can handle this case by not
1690  * allowing the leader to increment the stream block count during
1691  * the time parallel apply worker acquires the lock but it is not
1692  * clear whether that is worth the complexity.
1693  *
1694  * Now, if this missed chunk contains rollback to savepoint, then
1695  * there is a risk of deadlock which probably shouldn't happen
1696  * after restart.
1697  */
1699  break;
1700 
1701  default:
1702  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1703  break;
1704  }
1705 
1706  in_streamed_transaction = false;
1708 
1709  /*
1710  * The parallel apply worker could be in a transaction in which case we
1711  * need to report the state as STATE_IDLEINTRANSACTION.
1712  */
1715  else
1717 
1719 }
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1600
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4976

References AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3141 of file worker.c.

3142 {
3143  bool cascade = false;
3144  bool restart_seqs = false;
3145  List *remote_relids = NIL;
3146  List *remote_rels = NIL;
3147  List *rels = NIL;
3148  List *part_rels = NIL;
3149  List *relids = NIL;
3150  List *relids_logged = NIL;
3151  ListCell *lc;
3152  LOCKMODE lockmode = AccessExclusiveLock;
3153 
3154  /*
3155  * Quick return if we are skipping data modification changes or handling
3156  * streamed transactions.
3157  */
3158  if (is_skipping_changes() ||
3160  return;
3161 
3163 
3164  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3165 
3166  foreach(lc, remote_relids)
3167  {
3168  LogicalRepRelId relid = lfirst_oid(lc);
3169  LogicalRepRelMapEntry *rel;
3170 
3171  rel = logicalrep_rel_open(relid, lockmode);
3172  if (!should_apply_changes_for_rel(rel))
3173  {
3174  /*
3175  * The relation can't become interesting in the middle of the
3176  * transaction so it's safe to unlock it.
3177  */
3178  logicalrep_rel_close(rel, lockmode);
3179  continue;
3180  }
3181 
3182  remote_rels = lappend(remote_rels, rel);
3184  rels = lappend(rels, rel->localrel);
3185  relids = lappend_oid(relids, rel->localreloid);
3187  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3188 
3189  /*
3190  * Truncate partitions if we got a message to truncate a partitioned
3191  * table.
3192  */
3193  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3194  {
3195  ListCell *child;
3196  List *children = find_all_inheritors(rel->localreloid,
3197  lockmode,
3198  NULL);
3199 
3200  foreach(child, children)
3201  {
3202  Oid childrelid = lfirst_oid(child);
3203  Relation childrel;
3204 
3205  if (list_member_oid(relids, childrelid))
3206  continue;
3207 
3208  /* find_all_inheritors already got lock */
3209  childrel = table_open(childrelid, NoLock);
3210 
3211  /*
3212  * Ignore temp tables of other backends. See similar code in
3213  * ExecuteTruncate().
3214  */
3215  if (RELATION_IS_OTHER_TEMP(childrel))
3216  {
3217  table_close(childrel, lockmode);
3218  continue;
3219  }
3220 
3222  rels = lappend(rels, childrel);
3223  part_rels = lappend(part_rels, childrel);
3224  relids = lappend_oid(relids, childrelid);
3225  /* Log this relation only if needed for logical decoding */
3226  if (RelationIsLogicallyLogged(childrel))
3227  relids_logged = lappend_oid(relids_logged, childrelid);
3228  }
3229  }
3230  }
3231 
3232  /*
3233  * Even if we used CASCADE on the upstream primary we explicitly default
3234  * to replaying changes without further cascading. This might be later
3235  * changeable with a user specified option.
3236  *
3237  * MySubscription->runasowner tells us whether we want to execute
3238  * replication actions as the subscription owner; the last argument to
3239  * TruncateGuts tells it whether we want to switch to the table owner.
3240  * Those are exactly opposite conditions.
3241  */
3242  ExecuteTruncateGuts(rels,
3243  relids,
3244  relids_logged,
3245  DROP_RESTRICT,
3246  restart_seqs,
3248  foreach(lc, remote_rels)
3249  {
3250  LogicalRepRelMapEntry *rel = lfirst(lc);
3251 
3253  }
3254  foreach(lc, part_rels)
3255  {
3256  Relation rel = lfirst(lc);
3257 
3258  table_close(rel, NoLock);
3259  }
3260 
3262 }
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2334
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1884

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2890 of file worker.c.

2894 {
2895  EState *estate = edata->estate;
2896  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2897  ResultRelInfo *relinfo = edata->targetRelInfo;
2898  Relation parentrel = relinfo->ri_RelationDesc;
2899  ModifyTableState *mtstate;
2900  PartitionTupleRouting *proute;
2901  ResultRelInfo *partrelinfo;
2902  Relation partrel;
2903  TupleTableSlot *remoteslot_part;
2904  TupleConversionMap *map;
2905  MemoryContext oldctx;
2906  LogicalRepRelMapEntry *part_entry = NULL;
2907  AttrMap *attrmap = NULL;
2908 
2909  /* ModifyTableState is needed for ExecFindPartition(). */
2910  edata->mtstate = mtstate = makeNode(ModifyTableState);
2911  mtstate->ps.plan = NULL;
2912  mtstate->ps.state = estate;
2913  mtstate->operation = operation;
2914  mtstate->resultRelInfo = relinfo;
2915 
2916  /* ... as is PartitionTupleRouting. */
2917  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2918 
2919  /*
2920  * Find the partition to which the "search tuple" belongs.
2921  */
2922  Assert(remoteslot != NULL);
2924  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2925  remoteslot, estate);
2926  Assert(partrelinfo != NULL);
2927  partrel = partrelinfo->ri_RelationDesc;
2928 
2929  /*
2930  * Check for supported relkind. We need this since partitions might be of
2931  * unsupported relkinds; and the set of partitions can change, so checking
2932  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2933  */
2934  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2936  RelationGetRelationName(partrel));
2937 
2938  /*
2939  * To perform any of the operations below, the tuple must match the
2940  * partition's rowtype. Convert if needed or just copy, using a dedicated
2941  * slot to store the tuple in any case.
2942  */
2943  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2944  if (remoteslot_part == NULL)
2945  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2946  map = ExecGetRootToChildMap(partrelinfo, estate);
2947  if (map != NULL)
2948  {
2949  attrmap = map->attrMap;
2950  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2951  remoteslot_part);
2952  }
2953  else
2954  {
2955  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2956  slot_getallattrs(remoteslot_part);
2957  }
2958  MemoryContextSwitchTo(oldctx);
2959 
2960  /* Check if we can do the update or delete on the leaf partition. */
2961  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2962  {
2963  part_entry = logicalrep_partition_open(relmapentry, partrel,
2964  attrmap);
2965  check_relation_updatable(part_entry);
2966  }
2967 
2968  switch (operation)
2969  {
2970  case CMD_INSERT:
2971  apply_handle_insert_internal(edata, partrelinfo,
2972  remoteslot_part);
2973  break;
2974 
2975  case CMD_DELETE:
2976  apply_handle_delete_internal(edata, partrelinfo,
2977  remoteslot_part,
2978  part_entry->localindexoid);
2979  break;
2980 
2981  case CMD_UPDATE:
2982 
2983  /*
2984  * For UPDATE, depending on whether or not the updated tuple
2985  * satisfies the partition's constraint, perform a simple UPDATE
2986  * of the partition or move the updated tuple into a different
2987  * suitable partition.
2988  */
2989  {
2990  TupleTableSlot *localslot;
2991  ResultRelInfo *partrelinfo_new;
2992  Relation partrel_new;
2993  bool found;
2994 
2995  /* Get the matching local tuple from the partition. */
2996  found = FindReplTupleInLocalRel(edata, partrel,
2997  &part_entry->remoterel,
2998  part_entry->localindexoid,
2999  remoteslot_part, &localslot);
3000  if (!found)
3001  {
3002  /*
3003  * The tuple to be updated could not be found. Do nothing
3004  * except for emitting a log message.
3005  *
3006  * XXX should this be promoted to ereport(LOG) perhaps?
3007  */
3008  elog(DEBUG1,
3009  "logical replication did not find row to be updated "
3010  "in replication target relation's partition \"%s\"",
3011  RelationGetRelationName(partrel));
3012  return;
3013  }
3014 
3015  /*
3016  * Apply the update to the local tuple, putting the result in
3017  * remoteslot_part.
3018  */
3020  slot_modify_data(remoteslot_part, localslot, part_entry,
3021  newtup);
3022  MemoryContextSwitchTo(oldctx);
3023 
3024  /*
3025  * Does the updated tuple still satisfy the current
3026  * partition's constraint?
3027  */
3028  if (!partrel->rd_rel->relispartition ||
3029  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3030  false))
3031  {
3032  /*
3033  * Yes, so simply UPDATE the partition. We don't call
3034  * apply_handle_update_internal() here, which would
3035  * normally do the following work, to avoid repeating some
3036  * work already done above to find the local tuple in the
3037  * partition.
3038  */
3039  EPQState epqstate;
3040 
3041  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3042  ExecOpenIndices(partrelinfo, false);
3043 
3044  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3046  ACL_UPDATE);
3047  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3048  localslot, remoteslot_part);
3049  ExecCloseIndices(partrelinfo);
3050  EvalPlanQualEnd(&epqstate);
3051  }
3052  else
3053  {
3054  /* Move the tuple into the new partition. */
3055 
3056  /*
3057  * New partition will be found using tuple routing, which
3058  * can only occur via the parent table. We might need to
3059  * convert the tuple to the parent's rowtype. Note that
3060  * this is the tuple found in the partition, not the
3061  * original search tuple received by this function.
3062  */
3063  if (map)
3064  {
3065  TupleConversionMap *PartitionToRootMap =
3067  RelationGetDescr(parentrel));
3068 
3069  remoteslot =
3070  execute_attr_map_slot(PartitionToRootMap->attrMap,
3071  remoteslot_part, remoteslot);
3072  }
3073  else
3074  {
3075  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3076  slot_getallattrs(remoteslot);
3077  }
3078 
3079  /* Find the new partition. */
3081  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3082  proute, remoteslot,
3083  estate);
3084  MemoryContextSwitchTo(oldctx);
3085  Assert(partrelinfo_new != partrelinfo);
3086  partrel_new = partrelinfo_new->ri_RelationDesc;
3087 
3088  /* Check that new partition also has supported relkind. */
3089  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3091  RelationGetRelationName(partrel_new));
3092 
3093  /* DELETE old tuple found in the old partition. */
3094  apply_handle_delete_internal(edata, partrelinfo,
3095  localslot,
3096  part_entry->localindexoid);
3097 
3098  /* INSERT new tuple into the new partition. */
3099 
3100  /*
3101  * Convert the replacement tuple to match the destination
3102  * partition rowtype.
3103  */
3105  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3106  if (remoteslot_part == NULL)
3107  remoteslot_part = table_slot_create(partrel_new,
3108  &estate->es_tupleTable);
3109  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3110  if (map != NULL)
3111  {
3112  remoteslot_part = execute_attr_map_slot(map->attrMap,
3113  remoteslot,
3114  remoteslot_part);
3115  }
3116  else
3117  {
3118  remoteslot_part = ExecCopySlot(remoteslot_part,
3119  remoteslot);
3120  slot_getallattrs(remoteslot);
3121  }
3122  MemoryContextSwitchTo(oldctx);
3123  apply_handle_insert_internal(edata, partrelinfo_new,
3124  remoteslot_part);
3125  }
3126  }
3127  break;
3128 
3129  default:
3130  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3131  break;
3132  }
3133 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:895
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1792
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1232
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
@ CMD_UPDATE
Definition: nodes.h:266
#define makeNode(_type_)
Definition: nodes.h:155
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetNamespace(relation)
Definition: rel.h:546
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:602
PartitionTupleRouting * proute
Definition: worker.c:215
ModifyTableState * mtstate
Definition: worker.c:214
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:669
CmdType operation
Definition: execnodes.h:1355
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1359
PlanState ps
Definition: execnodes.h:1354
Plan * plan
Definition: execnodes.h:1116
EState * state
Definition: execnodes.h:1118
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:583
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, LogicalRepRelMapEntry::localindexoid, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2321 of file worker.c.

2322 {
2323  LogicalRepTyp typ;
2324 
2326  return;
2327 
2328  logicalrep_read_typ(s, &typ);
2329 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2517 of file worker.c.

2518 {
2519  LogicalRepRelMapEntry *rel;
2520  LogicalRepRelId relid;
2521  UserContext ucxt;
2522  ApplyExecutionData *edata;
2523  EState *estate;
2524  LogicalRepTupleData oldtup;
2525  LogicalRepTupleData newtup;
2526  bool has_oldtup;
2527  TupleTableSlot *remoteslot;
2528  RTEPermissionInfo *target_perminfo;
2529  MemoryContext oldctx;
2530  bool run_as_owner;
2531 
2532  /*
2533  * Quick return if we are skipping data modification changes or handling
2534  * streamed transactions.
2535  */
2536  if (is_skipping_changes() ||
2538  return;
2539 
2541 
2542  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2543  &newtup);
2544  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2545  if (!should_apply_changes_for_rel(rel))
2546  {
2547  /*
2548  * The relation can't become interesting in the middle of the
2549  * transaction so it's safe to unlock it.
2550  */
2553  return;
2554  }
2555 
2556  /* Set relation for error callback */
2558 
2559  /* Check if we can do the update. */
2561 
2562  /*
2563  * Make sure that any user-supplied code runs as the table owner, unless
2564  * the user has opted out of that behavior.
2565  */
2566  run_as_owner = MySubscription->runasowner;
2567  if (!run_as_owner)
2568  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2569 
2570  /* Initialize the executor state. */
2571  edata = create_edata_for_relation(rel);
2572  estate = edata->estate;
2573  remoteslot = ExecInitExtraTupleSlot(estate,
2574  RelationGetDescr(rel->localrel),
2575  &TTSOpsVirtual);
2576 
2577  /*
2578  * Populate updatedCols so that per-column triggers can fire, and so
2579  * executor can correctly pass down indexUnchanged hint. This could
2580  * include more columns than were actually changed on the publisher
2581  * because the logical replication protocol doesn't contain that
2582  * information. But it would for example exclude columns that only exist
2583  * on the subscriber, since we are not touching those.
2584  */
2585  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2586  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2587  {
2589  int remoteattnum = rel->attrmap->attnums[i];
2590 
2591  if (!att->attisdropped && remoteattnum >= 0)
2592  {
2593  Assert(remoteattnum < newtup.ncols);
2594  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2595  target_perminfo->updatedCols =
2596  bms_add_member(target_perminfo->updatedCols,
2598  }
2599  }
2600 
2601  /* Build the search tuple. */
2603  slot_store_data(remoteslot, rel,
2604  has_oldtup ? &oldtup : &newtup);
2605  MemoryContextSwitchTo(oldctx);
2606 
2607  /* For a partitioned table, apply update to correct partition. */
2608  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2610  remoteslot, &newtup, CMD_UPDATE);
2611  else
2613  remoteslot, &newtup, rel->localindexoid);
2614 
2615  finish_edata(edata);
2616 
2617  /* Reset relation for error callback */
2619 
2620  if (!run_as_owner)
2621  RestoreUserContext(&ucxt);
2622 
2624 
2626 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2634
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
int i
Definition: isn.c:73
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:632
Bitmapset * updatedCols
Definition: parsenodes.h:1299
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2634 of file worker.c.

2639 {
2640  EState *estate = edata->estate;
2641  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2642  Relation localrel = relinfo->ri_RelationDesc;
2643  EPQState epqstate;
2644  TupleTableSlot *localslot;
2645  bool found;
2646  MemoryContext oldctx;
2647 
2648  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2649  ExecOpenIndices(relinfo, false);
2650 
2651  found = FindReplTupleInLocalRel(edata, localrel,
2652  &relmapentry->remoterel,
2653  localindexoid,
2654  remoteslot, &localslot);
2655  ExecClearTuple(remoteslot);
2656 
2657  /*
2658  * Tuple found.
2659  *
2660  * Note this will fail if there are other conflicting unique indexes.
2661  */
2662  if (found)
2663  {
2664  /* Process and store remote tuple in the slot */
2666  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2667  MemoryContextSwitchTo(oldctx);
2668 
2669  EvalPlanQualSetSlot(&epqstate, remoteslot);
2670 
2671  /* Do the actual update. */
2673  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2674  remoteslot);
2675  }
2676  else
2677  {
2678  /*
2679  * The tuple to be updated could not be found. Do nothing except for
2680  * emitting a log message.
2681  *
2682  * XXX should this be promoted to ereport(LOG) perhaps?
2683  */
2684  elog(DEBUG1,
2685  "logical replication did not find row to be updated "
2686  "in replication target relation \"%s\"",
2687  RelationGetRelationName(localrel));
2688  }
2689 
2690  /* Cleanup. */
2691  ExecCloseIndices(relinfo);
2692  EvalPlanQualEnd(&epqstate);
2693 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454

References ACL_UPDATE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 1998 of file worker.c.

2000 {
2001  int nchanges;
2002  char path[MAXPGPATH];
2003  char *buffer = NULL;
2004  MemoryContext oldcxt;
2005  ResourceOwner oldowner;
2006  int fileno;
2007  off_t offset;
2008 
2009  if (!am_parallel_apply_worker())
2011 
2012  /* Make sure we have an open transaction */
2014 
2015  /*
2016  * Allocate file handle and memory required to process all the messages in
2017  * TopTransactionContext to avoid them getting reset after each message is
2018  * processed.
2019  */
2021 
2022  /* Open the spool file for the committed/prepared transaction */
2024  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2025 
2026  /*
2027  * Make sure the file is owned by the toplevel transaction so that the
2028  * file will not be accidentally closed when aborting a subtransaction.
2029  */
2030  oldowner = CurrentResourceOwner;
2032 
2033  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2034 
2035  CurrentResourceOwner = oldowner;
2036 
2037  buffer = palloc(BLCKSZ);
2038 
2039  MemoryContextSwitchTo(oldcxt);
2040 
2041  remote_final_lsn = lsn;
2042 
2043  /*
2044  * Make sure the handle apply_dispatch methods are aware we're in a remote
2045  * transaction.
2046  */
2047  in_remote_transaction = true;
2049 
2051 
2052  /*
2053  * Read the entries one by one and pass them through the same logic as in
2054  * apply_dispatch.
2055  */
2056  nchanges = 0;
2057  while (true)
2058  {
2060  size_t nbytes;
2061  int len;
2062 
2064 
2065  /* read length of the on-disk record */
2066  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2067 
2068  /* have we reached end of the file? */
2069  if (nbytes == 0)
2070  break;
2071 
2072  /* do we have a correct length? */
2073  if (len <= 0)
2074  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2075  len, path);
2076 
2077  /* make sure we have sufficiently large buffer */
2078  buffer = repalloc(buffer, len);
2079 
2080  /* and finally read the data into the buffer */
2081  BufFileReadExact(stream_fd, buffer, len);
2082 
2083  BufFileTell(stream_fd, &fileno, &offset);
2084 
2085  /* init a stringinfo using the buffer and call apply_dispatch */
2086  initReadOnlyStringInfo(&s2, buffer, len);
2087 
2088  /* Ensure we are reading the data into our memory context. */
2090 
2091  apply_dispatch(&s2);
2092 
2094 
2095  MemoryContextSwitchTo(oldcxt);
2096 
2097  nchanges++;
2098 
2099  /*
2100  * It is possible the file has been closed because we have processed
2101  * the transaction end message like stream_commit in which case that
2102  * must be the last message.
2103  */
2104  if (!stream_fd)
2105  {
2106  ensure_last_message(stream_fileset, xid, fileno, offset);
2107  break;
2108  }
2109 
2110  if (nchanges % 1000 == 0)
2111  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2112  nchanges, path);
2113  }
2114 
2115  if (stream_fd)
2117 
2118  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2119  nchanges, path);
2120 
2121  return;
2122 }
MemoryContext ApplyMessageContext
Definition: worker.c:290
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4188
void apply_dispatch(StringInfo s)
Definition: worker.c:3269
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1966
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 3828 of file worker.c.

3829 {
3831  {
3832  /*
3833  * Don't stop the parallel apply worker as the leader will detect the
3834  * subscription parameter change and restart logical replication later
3835  * anyway. This also prevents the leader from reporting errors when
3836  * trying to communicate with a stopped parallel apply worker, which
3837  * would accidentally disable subscriptions if disable_on_error was
3838  * set.
3839  */
3840  return;
3841  }
3842 
3843  /*
3844  * Reset the last-start time for this apply worker so that the launcher
3845  * will restart it without waiting for wal_retrieve_retry_interval if the
3846  * subscription is still active, and so that we won't leak that hash table
3847  * entry if it isn't.
3848  */
3849  if (am_leader_apply_worker())
3851 
3852  proc_exit(0);
3853 }
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), and maybe_reread_subscription().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4665 of file worker.c.

4666 {
4667  int worker_slot = DatumGetInt32(main_arg);
4668 
4669  InitializingApplyWorker = true;
4670 
4671  SetupApplyOrSyncWorker(worker_slot);
4672 
4673  InitializingApplyWorker = false;
4674 
4675  run_apply_worker();
4676 
4677  proc_exit(0);
4678 }
bool InitializingApplyWorker
Definition: worker.c:318
static void run_apply_worker()
Definition: worker.c:4436
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4624
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 4983 of file worker.c.

4984 {
4985  if (isCommit && on_commit_wakeup_workers_subids != NIL)
4986  {
4987  ListCell *lc;
4988 
4989  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
4990  foreach(lc, on_commit_wakeup_workers_subids)
4991  {
4992  Oid subid = lfirst_oid(lc);
4993  List *workers;
4994  ListCell *lc2;
4995 
4996  workers = logicalrep_workers_find(subid, true, false);
4997  foreach(lc2, workers)
4998  {
4999  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5000 
5002  }
5003  }
5004  LWLockRelease(LogicalRepWorkerLock);
5005  }
5006 
5007  /* The List storage will be reclaimed automatically in xact cleanup. */
5009 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:301
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:275
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4188 of file worker.c.

4189 {
4190  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4191 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2476 of file worker.c.

2477 {
2478  /*
2479  * For partitioned tables, we only need to care if the target partition is
2480  * updatable (aka has PK or RI defined for it).
2481  */
2482  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2483  return;
2484 
2485  /* Updatable, no error. */
2486  if (rel->updatable)
2487  return;
2488 
2489  /*
2490  * We are in error mode so it's fine this is somewhat slow. It's better to
2491  * give user correct error.
2492  */
2494  {
2495  ereport(ERROR,
2496  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2497  errmsg("publisher did not send replica identity column "
2498  "expected by the logical replication target relation \"%s.%s\"",
2499  rel->remoterel.nspname, rel->remoterel.relname)));
2500  }
2501 
2502  ereport(ERROR,
2503  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2504  errmsg("logical replication target relation \"%s.%s\" has "
2505  "neither REPLICA IDENTITY index nor PRIMARY "
2506  "KEY and published relation does not have "
2507  "REPLICA IDENTITY FULL",
2508  rel->remoterel.nspname, rel->remoterel.relname)));
2509 }
#define OidIsValid(objectId)
Definition: c.h:775
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:851

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 4385 of file worker.c.

4386 {
4387  if (subxact_data.subxacts)
4389 
4390  subxact_data.subxacts = NULL;
4392  subxact_data.nsubxacts = 0;
4394 }
static ApplySubXactData subxact_data
Definition: worker.c:357
void pfree(void *pointer)
Definition: mcxt.c:1521
uint32 nsubxacts
Definition: worker.c:351
uint32 nsubxacts_max
Definition: worker.c:352
SubXactInfo * subxacts
Definition: worker.c:354
TransactionId subxact_last
Definition: worker.c:353

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 4792 of file worker.c.

4793 {
4794  Relation rel;
4795  Form_pg_subscription subform;
4796  HeapTuple tup;
4797  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4798  bool started_tx = false;
4799 
4801  return;
4802 
4803  if (!IsTransactionState())
4804  {
4806  started_tx = true;
4807  }
4808 
4809  /*
4810  * Protect subskiplsn of pg_subscription from being concurrently updated
4811  * while clearing it.
4812  */
4813  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4814  AccessShareLock);
4815 
4816  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4817 
4818  /* Fetch the existing tuple. */
4819  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4821 
4822  if (!HeapTupleIsValid(tup))
4823  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4824 
4825  subform = (Form_pg_subscription) GETSTRUCT(tup);
4826 
4827  /*
4828  * Clear the subskiplsn. If the user has already changed subskiplsn before
4829  * clearing it we don't update the catalog and the replication origin
4830  * state won't get advanced. So in the worst case, if the server crashes
4831  * before sending an acknowledgment of the flush position the transaction
4832  * will be sent again and the user needs to set subskiplsn again. We can
4833  * reduce the possibility by logging a replication origin WAL record to
4834  * advance the origin LSN instead but there is no way to advance the
4835  * origin timestamp and it doesn't seem to be worth doing anything about
4836  * it since it's a very rare case.
4837  */
4838  if (subform->subskiplsn == myskiplsn)
4839  {
4840  bool nulls[Natts_pg_subscription];
4841  bool replaces[Natts_pg_subscription];
4842  Datum values[Natts_pg_subscription];
4843 
4844  memset(values, 0, sizeof(values));
4845  memset(nulls, false, sizeof(nulls));
4846  memset(replaces, false, sizeof(replaces));
4847 
4848  /* reset subskiplsn */
4849  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4850  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4851 
4852  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4853  replaces);
4854  CatalogTupleUpdate(rel, &tup->t_self, tup);
4855 
4856  if (myskiplsn != finish_lsn)
4857  ereport(WARNING,
4858  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4859  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4860  LSN_FORMAT_ARGS(finish_lsn),
4861  LSN_FORMAT_ARGS(myskiplsn)));
4862  }
4863 
4864  heap_freetuple(tup);
4865  table_close(rel, NoLock);
4866 
4867  if (started_tx)
4869 }
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define likely(x)
Definition: c.h:310
int errdetail(const char *fmt,...)
Definition: elog.c:1203
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1073
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 649 of file worker.c.

650 {
651  ApplyExecutionData *edata;
652  EState *estate;
653  RangeTblEntry *rte;
654  List *perminfos = NIL;
655  ResultRelInfo *resultRelInfo;
656 
657  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
658  edata->targetRel = rel;
659 
660  edata->estate = estate = CreateExecutorState();
661 
662  rte = makeNode(RangeTblEntry);
663  rte->rtekind = RTE_RELATION;
664  rte->relid = RelationGetRelid(rel->localrel);
665  rte->relkind = rel->localrel->rd_rel->relkind;
666  rte->rellockmode = AccessShareLock;
667 
668  addRTEPermissionInfo(&perminfos, rte);
669 
670  ExecInitRangeTable(estate, list_make1(rte), perminfos);
671 
672  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
673 
674  /*
675  * Use Relation opened by logicalrep_rel_open() instead of opening it
676  * again.
677  */
678  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
679 
680  /*
681  * We put the ResultRelInfo in the es_opened_result_relations list, even
682  * though we don't populate the es_result_relations array. That's a bit
683  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
684  *
685  * ExecOpenIndices() is not called here either, each execution path doing
686  * an apply operation being responsible for that.
687  */
689  lappend(estate->es_opened_result_relations, resultRelInfo);
690 
691  estate->es_output_cid = GetCurrentCommandId(true);
692 
693  /* Prepare to catch AFTER triggers. */
695 
696  /* other fields of edata remain NULL for now */
697 
698  return edata;
699 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1199
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:728
EState * CreateExecutorState(void)
Definition: execUtils.c:88
void * palloc0(Size size)
Definition: mcxt.c:1347
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1028
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:505
List * es_opened_result_relations
Definition: execnodes.h:645
CommandId es_output_cid
Definition: execnodes.h:639
RTEKind rtekind
Definition: parsenodes.h:1057
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5020
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:827

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 4685 of file worker.c.

4686 {
4687  /*
4688  * Emit the error message, and recover from the error state to an idle
4689  * state
4690  */
4691  HOLD_INTERRUPTS();
4692 
4693  EmitErrorReport();
4695  FlushErrorState();
4696 
4698 
4699  /* Report the worker failed during either table synchronization or apply */
4701  !am_tablesync_worker());
4702 
4703  /* Disable the subscription */
4707 
4708  /* Ensure we remove no-longer-useful entry for worker's start time */
4709  if (am_leader_apply_worker())
4711 
4712  /* Notify the subscription has been disabled and exit */
4713  ereport(LOG,
4714  errmsg("subscription \"%s\" has been disabled because of an error",
4715  MySubscription->name));
4716 
4717  proc_exit(0);
4718 }
void EmitErrorReport(void)
Definition: elog.c:1670
void FlushErrorState(void)
Definition: elog.c:1850
#define LOG
Definition: elog.h:31
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4849

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 1966 of file worker.c.

1968 {
1969  char path[MAXPGPATH];
1970  BufFile *fd;
1971  int last_fileno;
1972  off_t last_offset;
1973 
1975 
1977 
1979 
1980  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
1981 
1982  BufFileSeek(fd, 0, 0, SEEK_END);
1983  BufFileTell(fd, &last_fileno, &last_offset);
1984 
1985  BufFileClose(fd);
1986 
1988 
1989  if (last_fileno != fileno || last_offset != offset)
1990  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
1991  path);
1992 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2843 of file worker.c.

2848 {
2849  EState *estate = edata->estate;
2850  bool found;
2851 
2852  /*
2853  * Regardless of the top-level operation, we're performing a read here, so
2854  * check for SELECT privileges.
2855  */
2856  TargetPrivilegesCheck(localrel, ACL_SELECT);
2857 
2858  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2859 
2860  Assert(OidIsValid(localidxoid) ||
2861  (remoterel->replident == REPLICA_IDENTITY_FULL));
2862 
2863  if (OidIsValid(localidxoid))
2864  {
2865 #ifdef USE_ASSERT_CHECKING
2866  Relation idxrel = index_open(localidxoid, AccessShareLock);
2867 
2868  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2869  Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2871  edata->targetRel->attrmap));
2872  index_close(idxrel, AccessShareLock);
2873 #endif
2874 
2875  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2877  remoteslot, *localslot);
2878  }
2879  else
2880  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2881  remoteslot, *localslot);
2882 
2883  return found;
2884 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2404
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap)
Definition: relation.c:804

References AccessShareLock, ACL_SELECT, Assert, LogicalRepRelMapEntry::attrmap, BuildIndexInfo(), EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 706 of file worker.c.

707 {
708  EState *estate = edata->estate;
709 
710  /* Handle any queued AFTER triggers. */
711  AfterTriggerEndQuery(estate);
712 
713  /* Shut down tuple routing, if any was done. */
714  if (edata->proute)
715  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
716 
717  /*
718  * Cleanup. It might seem that we should call ExecCloseResultRelations()
719  * here, but we intentionally don't. It would close the rel we added to
720  * es_opened_result_relations above, which is wrong because we took no
721  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
722  * any other relations opened during execution.
723  */
724  ExecResetTupleTable(estate->es_tupleTable, false);
725  FreeExecutorState(estate);
726  pfree(edata);
727 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1278
void FreeExecutorState(EState *estate)
Definition: execUtils.c:189
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5040

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3389 of file worker.c.

3391 {
3392  dlist_mutable_iter iter;
3393  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3394 
3396  *flush = InvalidXLogRecPtr;
3397 
3399  {
3400  FlushPosition *pos =
3401  dlist_container(FlushPosition, node, iter.cur);
3402 
3403  *write = pos->remote_end;
3404 
3405  if (pos->local_end <= local_flush)
3406  {
3407  *flush = pos->remote_end;
3408  dlist_delete(iter.cur);
3409  pfree(pos);
3410  }
3411  else
3412  {
3413  /*
3414  * Don't want to uselessly iterate over the rest of the list which
3415  * could potentially be long. Instead get the last element and
3416  * grab the write position from there.
3417  */
3418  pos = dlist_tail_element(FlushPosition, node,
3419  &lsn_mapping);
3420  *write = pos->remote_end;
3421  *have_pending_txes = true;
3422  return;
3423  }
3424  }
3425 
3426  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3427 }
static dlist_head lsn_mapping
Definition: worker.c:204
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:201
XLogRecPtr local_end
Definition: worker.c:200
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6455

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 5030 of file worker.c.

5031 {
5032  *winfo = NULL;
5033 
5035  {
5036  return TRANS_PARALLEL_APPLY;
5037  }
5038 
5039  /*
5040  * If we are processing this transaction using a parallel apply worker
5041  * then either we send the changes to the parallel worker or if the worker
5042  * is busy then serialize the changes to the file which will later be
5043  * processed by the parallel worker.
5044  */
5045  *winfo = pa_find_worker(xid);
5046 
5047  if (*winfo && (*winfo)->serialize_changes)
5048  {
5050  }
5051  else if (*winfo)
5052  {
5054  }
5055 
5056  /*
5057  * If there is no parallel worker involved to process this transaction
5058  * then we either directly apply the change or serialize it to a file
5059  * which will later be applied when the transaction finish message is
5060  * processed.
5061  */
5062  else if (in_streamed_transaction)
5063  {
5064  return TRANS_LEADER_SERIALIZE;
5065  }
5066  else
5067  {
5068  return TRANS_LEADER_APPLY;
5069  }
5070 }
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 556 of file worker.c.

557 {
558  TransactionId current_xid;
560  TransApplyAction apply_action;
561  StringInfoData original_msg;
562 
563  apply_action = get_transaction_apply_action(stream_xid, &winfo);
564 
565  /* not in streaming mode */
566  if (apply_action == TRANS_LEADER_APPLY)
567  return false;
568 
570 
571  /*
572  * The parallel apply worker needs the xid in this message to decide
573  * whether to define a savepoint, so save the original message that has
574  * not moved the cursor after the xid. We will serialize this message to a
575  * file in PARTIAL_SERIALIZE mode.
576  */
577  original_msg = *s;
578 
579  /*
580  * We should have received XID of the subxact as the first part of the
581  * message, so extract it.
582  */
583  current_xid = pq_getmsgint(s, 4);
584 
585  if (!TransactionIdIsValid(current_xid))
586  ereport(ERROR,
587  (errcode(ERRCODE_PROTOCOL_VIOLATION),
588  errmsg_internal("invalid transaction ID in streamed replication transaction")));
589 
590  switch (apply_action)
591  {
593  Assert(stream_fd);
594 
595  /* Add the new subxact to the array (unless already there). */
596  subxact_info_add(current_xid);
597 
598  /* Write the change to the current file */
600  return true;
601 
603  Assert(winfo);
604 
605  /*
606  * XXX The publisher side doesn't always send relation/type update
607  * messages after the streaming transaction, so also update the
608  * relation/type in leader apply worker. See function
609  * cleanup_rel_sync_cache.
610  */
611  if (pa_send_data(winfo, s->len, s->data))
612  return (action != LOGICAL_REP_MSG_RELATION &&
614 
615  /*
616  * Switch to serialize mode when we are not able to send the
617  * change to parallel apply worker.
618  */
619  pa_switch_to_partial_serialize(winfo, false);
620 
621  /* fall through */
623  stream_write_change(action, &original_msg);
624 
625  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
626  return (action != LOGICAL_REP_MSG_RELATION &&
628 
631 
632  /* Define a savepoint for a subxact if needed. */
633  pa_start_subtrans(current_xid, stream_xid);
634  return false;
635 
636  default:
637  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
638  return false; /* silence compiler warning */
639  }
640 }
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4103
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415

References generate_unaccent_rules::action, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 4541 of file worker.c.

4542 {
4543  MemoryContext oldctx;
4544 
4545  /* Run as replica session replication role. */
4546  SetConfigOption("session_replication_role", "replica",
4548 
4549  /* Connect to our database. */
4552  0);
4553 
4554  /*
4555  * Set always-secure search path, so malicious users can't redirect user
4556  * code (e.g. pg_index.indexprs).
4557  */
4558  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4559 
4560  /* Load the subscription into persistent memory context. */
4562  "ApplyContext",
4566 
4568  if (!MySubscription)
4569  {
4570  ereport(LOG,
4571  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4573 
4574  /* Ensure we remove no-longer-useful entry for worker's start time */
4575  if (am_leader_apply_worker())
4577 
4578  proc_exit(0);
4579  }
4580 
4581  MySubscriptionValid = true;
4582  MemoryContextSwitchTo(oldctx);
4583 
4584  if (!MySubscription->enabled)
4585  {
4586  ereport(LOG,
4587  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4588  MySubscription->name)));
4589 
4591  }
4592 
4593  /* Setup synchronous commit according to the user's wishes */
4594  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4596 
4597  /*
4598  * Keep us informed about subscription or role changes. Note that the
4599  * role's superuser privilege can be revoked.
4600  */
4601  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4603  (Datum) 0);
4604 
4607  (Datum) 0);
4608 
4609  if (am_tablesync_worker())
4610  ereport(LOG,
4611  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4614  else
4615  ereport(LOG,
4616  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4617  MySubscription->name)));
4618 
4620 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3988
static void apply_worker_exit(void)
Definition: worker.c:3828
MemoryContext ApplyContext
Definition: worker.c:291
static bool MySubscriptionValid
Definition: worker.c:299
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:892
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4282
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
MemoryContext TopMemoryContext
Definition: mcxt.c:149
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4733 of file worker.c.

4734 {
4736 }
bool IsLogicalWorker(void)
Definition: worker.c:4724

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4724 of file worker.c.

4725 {
4726  return MyLogicalRepWorker != NULL;
4727 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3475 of file worker.c.

3476 {
3477  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3478  bool ping_sent = false;
3479  TimeLineID tli;
3480  ErrorContextCallback errcallback;
3481 
3482  /*
3483  * Init the ApplyMessageContext which we clean up after each replication
3484  * protocol message.
3485  */
3487  "ApplyMessageContext",
3489 
3490  /*
3491  * This memory context is used for per-stream data when the streaming mode
3492  * is enabled. This context is reset on each stream stop.
3493  */
3495  "LogicalStreamingContext",
3497 
3498  /* mark as idle, before starting to loop */
3500 
3501  /*
3502  * Push apply error context callback. Fields will be filled while applying
3503  * a change.
3504  */
3505  errcallback.callback = apply_error_callback;
3506  errcallback.previous = error_context_stack;
3507  error_context_stack = &errcallback;
3509 
3510  /* This outer loop iterates once per wait. */
3511  for (;;)
3512  {
3514  int rc;
3515  int len;
3516  char *buf = NULL;
3517  bool endofstream = false;
3518  long wait_time;
3519 
3521 
3523 
3525 
3526  if (len != 0)
3527  {
3528  /* Loop to process all available data (without blocking). */
3529  for (;;)
3530  {
3532 
3533  if (len == 0)
3534  {
3535  break;
3536  }
3537  else if (len < 0)
3538  {
3539  ereport(LOG,
3540  (errmsg("data stream from publisher has ended")));
3541  endofstream = true;
3542  break;
3543  }
3544  else
3545  {
3546  int c;
3547  StringInfoData s;
3548 
3549  if (ConfigReloadPending)
3550  {
3551  ConfigReloadPending = false;
3553  }
3554 
3555  /* Reset timeout. */
3556  last_recv_timestamp = GetCurrentTimestamp();
3557  ping_sent = false;
3558 
3559  /* Ensure we are reading the data into our memory context. */
3561 
3563 
3564  c = pq_getmsgbyte(&s);
3565 
3566  if (c == 'w')
3567  {
3568  XLogRecPtr start_lsn;
3569  XLogRecPtr end_lsn;
3570  TimestampTz send_time;
3571 
3572  start_lsn = pq_getmsgint64(&s);
3573  end_lsn = pq_getmsgint64(&s);
3574  send_time = pq_getmsgint64(&s);
3575 
3576  if (last_received < start_lsn)
3577  last_received = start_lsn;
3578 
3579  if (last_received < end_lsn)
3580  last_received = end_lsn;
3581 
3582  UpdateWorkerStats(last_received, send_time, false);
3583 
3584  apply_dispatch(&s);
3585  }
3586  else if (c == 'k')
3587  {
3588  XLogRecPtr end_lsn;
3590  bool reply_requested;
3591 
3592  end_lsn = pq_getmsgint64(&s);
3593  timestamp = pq_getmsgint64(&s);
3594  reply_requested = pq_getmsgbyte(&s);
3595 
3596  if (last_received < end_lsn)
3597  last_received = end_lsn;
3598 
3599  send_feedback(last_received, reply_requested, false);
3600  UpdateWorkerStats(last_received, timestamp, true);
3601  }
3602  /* other message types are purposefully ignored */
3603 
3605  }
3606 
3608  }
3609  }
3610 
3611  /* confirm all writes so far */
3612  send_feedback(last_received, false, false);
3613 
3615  {
3616  /*
3617  * If we didn't get any transactions for a while there might be
3618  * unconsumed invalidation messages in the queue, consume them
3619  * now.
3620  */
3623 
3624  /* Process any table synchronization changes. */
3625  process_syncing_tables(last_received);
3626  }
3627 
3628  /* Cleanup the memory. */
3631 
3632  /* Check if we need to exit the streaming loop. */
3633  if (endofstream)
3634  break;
3635 
3636  /*
3637  * Wait for more data or latch. If we have unflushed transactions,
3638  * wake up after WalWriterDelay to see if they've been flushed yet (in
3639  * which case we should send a feedback message). Otherwise, there's
3640  * no particular urgency about waking up unless we get data or a
3641  * signal.
3642  */
3643  if (!dlist_is_empty(&lsn_mapping))
3644  wait_time = WalWriterDelay;
3645  else
3646  wait_time = NAPTIME_PER_CYCLE;
3647 
3651  fd, wait_time,
3652  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3653 
3654  if (rc & WL_LATCH_SET)
3655  {
3658  }
3659 
3660  if (ConfigReloadPending)
3661  {
3662  ConfigReloadPending = false;
3664  }
3665 
3666  if (rc & WL_TIMEOUT)
3667  {
3668  /*
3669  * We didn't receive anything new. If we haven't heard anything
3670  * from the server for more than wal_receiver_timeout / 2, ping
3671  * the server. Also, if it's been longer than
3672  * wal_receiver_status_interval since the last update we sent,
3673  * send a status update to the primary anyway, to report any
3674  * progress in applying WAL.
3675  */
3676  bool requestReply = false;
3677 
3678  /*
3679  * Check if time since last receive from primary has reached the
3680  * configured limit.
3681  */
3682  if (wal_receiver_timeout > 0)
3683  {
3685  TimestampTz timeout;
3686 
3687  timeout =
3688  TimestampTzPlusMilliseconds(last_recv_timestamp,
3690 
3691  if (now >= timeout)
3692  ereport(ERROR,
3693  (errcode(ERRCODE_CONNECTION_FAILURE),
3694  errmsg("terminating logical replication worker due to timeout")));
3695 
3696  /* Check to see if it's time for a ping. */
3697  if (!ping_sent)
3698  {
3699  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3700  (wal_receiver_timeout / 2));
3701  if (now >= timeout)
3702  {
3703  requestReply = true;
3704  ping_sent = true;
3705  }
3706  }
3707  }
3708 
3709  send_feedback(last_received, requestReply, requestReply);
3710 
3711  /*
3712  * Force reporting to ensure long idle periods don't lead to
3713  * arbitrarily delayed stats. Stats can only be reported outside
3714  * of (implicit or explicit) transactions. That shouldn't lead to
3715  * stats being delayed for long, because transactions are either
3716  * sent as a whole on commit or streamed. Streamed transactions
3717  * are spilled to disk and applied on commit.
3718  */
3719  if (!IsTransactionState())
3720  pgstat_report_stat(true);
3721  }
3722  }
3723 
3724  /* Pop the error context stack */
3725  error_context_stack = errcallback.previous;
3727 
3728  /* All done */
3730 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3459
#define NAPTIME_PER_CYCLE
Definition: worker.c:195
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:288
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3739
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:296
void apply_error_callback(void *arg)
Definition: worker.c:4873
static MemoryContext LogicalStreamingContext
Definition: worker.c:294
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:94
struct Latch * MyLatch
Definition: globals.c:61
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static char * buf
Definition: pg_test_fsync.c:73
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
char * c
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_receiver_timeout
Definition: walreceiver.c:88
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:452
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:454
int WalWriterDelay
Definition: walwriter.c:71
uint32 TimeLineID
Definition: xlogdefs.h:59

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3859 of file worker.c.

3860 {
3861  MemoryContext oldctx;
3863  bool started_tx = false;
3864 
3865  /* When cache state is valid there is nothing to do here. */
3866  if (MySubscriptionValid)
3867  return;
3868 
3869  /* This function might be called inside or outside of transaction. */
3870  if (!IsTransactionState())
3871  {
3873  started_tx = true;
3874  }
3875 
3876  /* Ensure allocations in permanent context. */
3878 
3880 
3881  /*
3882  * Exit if the subscription was removed. This normally should not happen
3883  * as the worker gets killed during DROP SUBSCRIPTION.
3884  */
3885  if (!newsub)
3886  {
3887  ereport(LOG,
3888  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3889  MySubscription->name)));
3890 
3891  /* Ensure we remove no-longer-useful entry for worker's start time */
3892  if (am_leader_apply_worker())
3894 
3895  proc_exit(0);
3896  }
3897 
3898  /* Exit if the subscription was disabled. */
3899  if (!newsub->enabled)
3900  {
3901  ereport(LOG,
3902  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3903  MySubscription->name)));
3904 
3906  }
3907 
3908  /* !slotname should never happen when enabled is true. */
3909  Assert(newsub->slotname);
3910 
3911  /* two-phase cannot be altered while the worker is running */
3912  Assert(newsub->twophasestate == MySubscription->twophasestate);
3913 
3914  /*
3915  * Exit if any parameter that affects the remote connection was changed.
3916  * The launcher will start a new worker but note that the parallel apply
3917  * worker won't restart if the streaming option's value is changed from
3918  * 'parallel' to any other value or the server decides not to stream the
3919  * in-progress transaction.
3920  */
3921  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3922  strcmp(newsub->name, MySubscription->name) != 0 ||
3923  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3924  newsub->binary != MySubscription->binary ||
3925  newsub->stream != MySubscription->stream ||
3926  newsub->passwordrequired != MySubscription->passwordrequired ||
3927  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3928  newsub->owner != MySubscription->owner ||
3929  !equal(newsub->publications, MySubscription->publications))
3930  {
3932  ereport(LOG,
3933  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3934  MySubscription->name)));
3935  else
3936  ereport(LOG,
3937  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3938  MySubscription->name)));
3939 
3941  }
3942 
3943  /*
3944  * Exit if the subscription owner's superuser privileges have been
3945  * revoked.
3946  */
3947  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3948  {
3950  ereport(LOG,
3951  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3952  MySubscription->name));
3953  else
3954  ereport(LOG,
3955  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3956  MySubscription->name));
3957 
3959  }
3960 
3961  /* Check for other changes that should never happen too. */
3962  if (newsub->dbid != MySubscription->dbid)
3963  {
3964  elog(ERROR, "subscription %u changed unexpectedly",
3966  }
3967 
3968  /* Clean old subscription info and switch to new one. */
3971 
3972  MemoryContextSwitchTo(oldctx);
3973 
3974  /* Change synchronous commit according to the user's wishes */
3975  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3977 
3978  if (started_tx)
3980 
3981  MySubscriptionValid = true;
3982 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 4743 of file worker.c.

4744 {
4748 
4749  /*
4750  * Quick return if it's not requested to skip this transaction. This
4751  * function is called for every remote transaction and we assume that
4752  * skipping the transaction is not used often.
4753  */
4755  MySubscription->skiplsn != finish_lsn))
4756  return;
4757 
4758  /* Start skipping all changes of this transaction */
4759  skip_xact_finish_lsn = finish_lsn;
4760 
4761  ereport(LOG,
4762  errmsg("logical replication starts skipping transaction at LSN %X/%X",
4764 }
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:335

References Assert, ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 425 of file worker.c.

427 {
428  if (OidIsValid(relid))
429  {
430  /* Replication origin name for tablesync workers. */
431  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
432  }
433  else
434  {
435  /* Replication origin name for non-tablesync workers. */
436  snprintf(originname, szoriginname, "pg_%u", suboid);
437  }
438 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), run_apply_worker(), and run_tablesync_worker().

◆ reset_apply_error_context_info()

◆ run_apply_worker()

static void run_apply_worker ( )
static

Definition at line 4436 of file worker.c.

4437 {
4438  char originname[NAMEDATALEN];
4439  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4440  char *slotname = NULL;
4442  RepOriginId originid;
4443  TimeLineID startpointTLI;
4444  char *err;
4445  bool must_use_password;
4446 
4447  slotname = MySubscription->slotname;
4448 
4449  /*
4450  * This shouldn't happen if the subscription is enabled, but guard against
4451  * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4452  * slot is NULL.)
4453  */
4454  if (!slotname)
4455  ereport(ERROR,
4456  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4457  errmsg("subscription has no replication slot set")));
4458 
4459  /* Setup replication origin tracking. */
4461  originname, sizeof(originname));
4463  originid = replorigin_by_name(originname, true);
4464  if (!OidIsValid(originid))
4465  originid = replorigin_create(originname);
4466  replorigin_session_setup(originid, 0);
4467  replorigin_session_origin = originid;
4468  origin_startpos = replorigin_session_get_progress(false);
4470 
4471  /* Is the use of a password mandatory? */
4472  must_use_password = MySubscription->passwordrequired &&
4474 
4476  true, must_use_password,
4477  MySubscription->name, &err);
4478 
4479  if (LogRepWorkerWalRcvConn == NULL)
4480  ereport(ERROR,
4481  (errcode(ERRCODE_CONNECTION_FAILURE),
4482  errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4483  MySubscription->name, err)));
4484 
4485  /*
4486  * We don't really use the output identify_system for anything but it does
4487  * some initializations on the upstream so let's still call it.
4488  */
4489  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4490 
4491  set_apply_error_context_origin(originname);
4492 
4493  set_stream_options(&options, slotname, &origin_startpos);
4494 
4495  /*
4496  * Even when the two_phase mode is requested by the user, it remains as
4497  * the tri-state PENDING until all tablesyncs have reached READY state.
4498  * Only then, can it become ENABLED.
4499  *
4500  * Note: If the subscription has no tables then leave the state as
4501  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4502  * work.
4503  */
4506  {
4507  /* Start streaming with two_phase enabled */
4508  options.proto.logical.twophase = true;
4510 
4515  }
4516  else
4517  {
4519  }
4520 
4521  ereport(DEBUG1,
4522  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4527  "?")));
4528 
4529  /* Run the main loop. */
4530  start_apply(origin_startpos);
4531 }
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4335
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4404
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:425
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5015
void err(int eval, const char *fmt,...)
Definition: err.c:43
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:155
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1237
#define NAMEDATALEN
static char ** options
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
bool AllTablesyncsReady(void)
Definition: tablesync.c:1732
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1757
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:450
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:434
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:442
uint16 RepOriginId
Definition: xlogdefs.h:65

References AllTablesyncsReady(), CommitTransactionCommand(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, InvalidOid, InvalidXLogRecPtr, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, LogRepWorkerWalRcvConn, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::ownersuperuser, Subscription::passwordrequired, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), set_apply_error_context_origin(), set_stream_options(), Subscription::slotname, start_apply(), StartTransactionCommand(), Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by ApplyWorkerMain().

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 3739 of file worker.c.

3740 {
3741  static StringInfo reply_message = NULL;
3742  static TimestampTz send_time = 0;
3743 
3744  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3745  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3746  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3747 
3748  XLogRecPtr writepos;
3749  XLogRecPtr flushpos;
3750  TimestampTz now;
3751  bool have_pending_txes;
3752 
3753  /*
3754  * If the user doesn't want status to be reported to the publisher, be
3755  * sure to exit before doing anything at all.
3756  */
3757  if (!force && wal_receiver_status_interval <= 0)
3758  return;
3759 
3760  /* It's legal to not pass a recvpos */
3761  if (recvpos < last_recvpos)
3762  recvpos = last_recvpos;
3763 
3764  get_flush_position(&writepos, &flushpos, &have_pending_txes);
3765 
3766  /*
3767  * No outstanding transactions to flush, we can report the latest received
3768  * position. This is important for synchronous replication.
3769  */
3770  if (!have_pending_txes)
3771  flushpos = writepos = recvpos;
3772 
3773  if (writepos < last_writepos)
3774  writepos = last_writepos;
3775 
3776  if (flushpos < last_flushpos)
3777  flushpos = last_flushpos;
3778 
3780 
3781  /* if we've already reported everything we're good */
3782  if (!force &&
3783  writepos == last_writepos &&
3784  flushpos == last_flushpos &&
3785  !TimestampDifferenceExceeds(send_time, now,
3787  return;
3788  send_time = now;
3789 
3790  if (!reply_message)
3791  {
3793 
3795  MemoryContextSwitchTo(oldctx);
3796  }
3797  else
3799 
3800  pq_sendbyte(reply_message, 'r');
3801  pq_sendint64(reply_message, recvpos); /* write */
3802  pq_sendint64(reply_message, flushpos); /* flush */
3803  pq_sendint64(reply_message, writepos); /* apply */
3804  pq_sendint64(reply_message, now); /* sendTime */
3805  pq_sendbyte(reply_message, requestReply); /* replyRequested */
3806 
3807  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3808  force,
3809  LSN_FORMAT_ARGS(recvpos),
3810  LSN_FORMAT_ARGS(writepos),
3811  LSN_FORMAT_ARGS(flushpos));
3812 
3815 
3816  if (recvpos > last_recvpos)
3817  last_recvpos = recvpos;
3818  if (writepos > last_writepos)
3819  last_writepos = writepos;
3820  if (flushpos > last_flushpos)
3821  last_flushpos = flushpos;
3822 }
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3389
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
#define DEBUG2
Definition: elog.h:29
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
static StringInfoData reply_message
Definition: walreceiver.c:131
int wal_receiver_status_interval
Definition: walreceiver.c:87
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:456

References ApplyContext, StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 5015 of file worker.c.

5016 {
5018  originname);
5019 }
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_xact()

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 4335 of file worker.c.

4338 {
4339  int server_version;
4340 
4341  options->logical = true;
4342  options->startpoint = *origin_startpos;
4343  options->slotname = slotname;
4344 
4346  options->proto.logical.proto_version =
4351 
4352  options->proto.logical.publication_names = MySubscription->publications;
4353  options->proto.logical.binary = MySubscription->binary;
4354 
4355  /*
4356  * Assign the appropriate option value for streaming option according to
4357  * the 'streaming' mode and the publisher's ability to support that mode.
4358  */
4359  if (server_version >= 160000 &&
4361  {
4362  options->proto.logical.streaming_str = "parallel";
4364  }
4365  else if (server_version >= 140000 &&
4367  {
4368  options->proto.logical.streaming_str = "on";
4370  }
4371  else
4372  {
4373  options->proto.logical.streaming_str = NULL;
4375  }
4376 
4377  options->proto.logical.twophase = false;
4378  options->proto.logical.origin = pstrdup(MySubscription->origin);
4379 }
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static int server_version
Definition: pg_dumpall.c:110
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define walrcv_server_version(conn)
Definition: walreceiver.h:446

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_PARALLEL, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 4624 of file worker.c.

4625 {
4626  /* Attach to slot */
4627  logicalrep_worker_attach(worker_slot);
4628 
4630 
4631  /* Setup signal handling */
4633  pqsignal(SIGTERM, die);
4635 
4636  /*
4637  * We don't currently need any ResourceOwner in a walreceiver process, but
4638  * if we did, we could call CreateAuxProcessResourceOwner here.
4639  */
4640 
4641  /* Initialise stats to a sanish value */
4644 
4645  /* Load the libpq-specific functions */
4646  load_file("libpqwalreceiver", false);
4647 
4649 
4650  /* Connect to the origin and start the replication. */
4651  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4653 
4654  /*
4655  * Setup callback for syscache so that we know when something changes in
4656  * the subscription relation state.
4657  */
4658  CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4660  (Datum) 0);
4661 }
void InitializeLogRepWorker(void)
Definition: worker.c:4541
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:932
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition: launcher.c:713
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:281
#define SIGHUP
Definition: win32_port.h:168

References am_leader_apply_worker(), am_tablesync_worker(), Assert, BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog, GetCurrentTimestamp(), InitializeLogRepWorker(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal(), LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), and TablesyncWorkerMain().

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 465 of file worker.c.

466 {
467  switch (MyLogicalRepWorker->type)
468  {
470  return MyLogicalRepWorker->relid == rel->localreloid;
471 
473  /* We don't synchronize rel's that are in unknown state. */
474  if (rel->state != SUBREL_STATE_READY &&
475  rel->state != SUBREL_STATE_UNKNOWN)
476  ereport(ERROR,
477  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
478  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
480  errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
481 
482  return rel->state == SUBREL_STATE_READY;
483 
484  case WORKERTYPE_APPLY:
485  return (rel->state == SUBREL_STATE_READY ||
486  (rel->state == SUBREL_STATE_SYNCDONE &&
487  rel->statelsn <= remote_final_lsn));
488 
489  case WORKERTYPE_UNKNOWN:
490  /* Should never happen. */
491  elog(ERROR, "Unknown worker type");
492  }
493 
494  return false; /* dummy for compiler */
495 }
LogicalRepWorkerType type
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY

References elog, ereport, errcode(), errdetail(), errmsg(), ERROR, LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 737 of file worker.c.

739 {
740  TupleDesc desc = RelationGetDescr(rel->localrel);
741  int num_phys_attrs = desc->natts;
742  int i;
743  int attnum,
744  num_defaults = 0;
745  int *defmap;
746  ExprState **defexprs;
747  ExprContext *econtext;
748 
749  econtext = GetPerTupleExprContext(estate);
750 
751  /* We got all the data via replication, no need to evaluate anything. */
752  if (num_phys_attrs == rel->remoterel.natts)
753  return;
754 
755  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
756  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
757 
758  Assert(rel->attrmap->maplen == num_phys_attrs);
759  for (attnum = 0; attnum < num_phys_attrs; attnum++)
760  {
761  Expr *defexpr;
762 
763  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
764  continue;
765 
766  if (rel->attrmap->attnums[attnum] >= 0)
767  continue;
768 
769  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
770 
771  if (defexpr != NULL)
772  {
773  /* Run the expression through planner */
774  defexpr = expression_planner(defexpr);
775 
776  /* Initialize executable expression in copycontext */
777  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
778  defmap[num_defaults] = attnum;
779  num_defaults++;
780  }
781  }
782 
783  for (i = 0; i < num_defaults; i++)
784  slot->tts_values[defmap[i]] =
785  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
786 }
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:134
#define GetPerTupleExprContext(estate)
Definition: executor.h:550
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:333
int16 attnum
Definition: pg_attribute.h:74
Expr * expression_planner(Expr *expr)
Definition: planner.c:6580
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 895 of file worker.c.

898 {
899  int natts = slot->tts_tupleDescriptor->natts;
900  int i;
901 
902  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
903  ExecClearTuple(slot);
904 
905  /*
906  * Copy all the column data from srcslot, so that we'll have valid values
907  * for unreplaced columns.
908  */
909  Assert(natts == srcslot->tts_tupleDescriptor->natts);
910  slot_getallattrs(srcslot);
911  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
912  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
913 
914  /* Call the "in" function for each replaced attribute */
915  Assert(natts == rel->attrmap->maplen);
916  for (i = 0; i < natts; i++)
917  {
919  int remoteattnum = rel->attrmap->attnums[i];
920 
921  if (remoteattnum < 0)
922  continue;
923 
924  Assert(remoteattnum < tupleData->ncols);
925 
926  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
927  {
928  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
929 
930  /* Set attnum for error callback */
932 
933  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
934  {
935  Oid typinput;
936  Oid typioparam;
937 
938  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
939  slot->tts_values[i] =
940  OidInputFunctionCall(typinput, colvalue->data,
941  typioparam, att->atttypmod);
942  slot->tts_isnull[i] = false;
943  }
944  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
945  {
946  Oid typreceive;
947  Oid typioparam;
948 
949  /*
950  * In some code paths we may be asked to re-parse the same
951  * tuple data. Reset the StringInfo's cursor so that works.
952  */
953  colvalue->cursor = 0;
954 
955  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
956  slot->tts_values[i] =
957  OidReceiveFunctionCall(typreceive, colvalue,
958  typioparam, att->atttypmod);
959 
960  /* Trouble if it didn't eat the whole buffer */
961  if (colvalue->cursor != colvalue->len)
962  ereport(ERROR,
963  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
964  errmsg("incorrect binary data format in logical replication column %d",
965  remoteattnum + 1)));
966  slot->tts_isnull[i] = false;
967  }
968  else
969  {
970  /* must be LOGICALREP_COLUMN_NULL */
971  slot->tts_values[i] = (Datum) 0;
972  slot->tts_isnull[i] = true;
973  }
974 
975  /* Reset attnum for error callback */
977  }
978  }
979 
980  /* And finally, declare that "slot" contains a valid virtual tuple */
981  ExecStoreVirtualTuple(slot);
982 }
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2874
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2940
StringInfoData * colvalues
Definition: logicalproto.h:87

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 794 of file worker.c.

796 {
797  int natts = slot->tts_tupleDescriptor->natts;
798  int i;
799 
800  ExecClearTuple(slot);
801 
802  /* Call the "in" function for each non-dropped, non-null attribute */
803  Assert(natts == rel->attrmap->maplen);
804  for (i = 0; i < natts; i++)
805  {
807  int remoteattnum = rel->attrmap->attnums[i];
808 
809  if (!att->attisdropped && remoteattnum >= 0)
810  {
811  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
812 
813  Assert(remoteattnum < tupleData->ncols);
814 
815  /* Set attnum for error callback */
817 
818  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
819  {
820  Oid typinput;
821  Oid typioparam;
822 
823  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
824  slot->tts_values[i] =
825  OidInputFunctionCall(typinput, colvalue->data,
826  typioparam, att->atttypmod);
827  slot->tts_isnull[i] = false;
828  }
829  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
830  {
831  Oid typreceive;
832  Oid typioparam;
833 
834  /*
835  * In some code paths we may be asked to re-parse the same
836  * tuple data. Reset the StringInfo's cursor so that works.
837  */
838  colvalue->cursor = 0;
839 
840  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
841  slot->tts_values[i] =
842  OidReceiveFunctionCall(typreceive, colvalue,
843  typioparam, att->atttypmod);
844 
845  /* Trouble if it didn't eat the whole buffer */
846  if (colvalue->cursor != colvalue->len)
847  ereport(ERROR,
848  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
849  errmsg("incorrect binary data format in logical replication column %d",
850  remoteattnum + 1)));
851  slot->tts_isnull[i] = false;
852  }
853  else
854  {
855  /*
856  * NULL value from remote. (We don't expect to see
857  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
858  * NULL.)
859  */
860  slot->tts_values[i] = (Datum) 0;
861  slot->tts_isnull[i] = true;
862  }
863 
864  /* Reset attnum for error callback */
866  }
867  else
868  {
869  /*
870  * We assign NULL to dropped attributes and missing values
871  * (missing values should be later filled using
872  * slot_fill_defaults).
873  */
874  slot->tts_values[i] = (Datum) 0;
875  slot->tts_isnull[i] = true;
876  }
877  }
878 
879  ExecStoreVirtualTuple(slot);
880 }

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 4404 of file worker.c.

4405 {
4406  PG_TRY();
4407  {
4408  LogicalRepApplyLoop(origin_startpos);
4409  }
4410  PG_CATCH();
4411  {
4414  else
4415  {
4416  /*
4417  * Report the worker failed while applying changes. Abort the
4418  * current transaction so that the stats message is sent in an
4419  * idle state.
4420  */
4423 
4424  PG_RE_THROW();
4425  }
4426  }
4427  PG_END_TRY();
4428 }
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3475
void DisableSubscriptionAndExit(void)
Definition: worker.c:4685
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 4770 of file worker.c.

4771 {
4772  if (!is_skipping_changes())
4773  return;
4774 
4775  ereport(LOG,
4776  (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4778 
4779  /* Stop skipping changes */
4781 }

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3433 of file worker.c.

3434 {
3435  FlushPosition *flushpos;
3436 
3437  /*
3438  * Skip for parallel apply workers, because the lsn_mapping is maintained
3439  * by the leader apply worker.
3440  */
3442  return;
3443 
3444  /* Need to do this in permanent context */
3446 
3447  /* Track commit lsn */
3448  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3449  flushpos->local_end = local_lsn;
3450  flushpos->remote_end = remote_lsn;
3451 
3452  dlist_push_tail(&lsn_mapping, &flushpos->node);
3454 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:199

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_abort_internal()

static void stream_abort_internal ( TransactionId  xid,
TransactionId  subxid 
)
static

Definition at line 1726 of file worker.c.

1727 {
1728  /*
1729  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1730  * just delete the files with serialized info.
1731  */
1732  if (xid == subxid)
1734  else
1735  {
1736  /*
1737  * OK, so it's a subxact. We need to read the subxact file for the
1738  * toplevel transaction, determine the offset tracked for the subxact,
1739  * and truncate the file with changes. We also remove the subxacts
1740  * with higher offsets (or rather higher XIDs).
1741  *
1742  * We intentionally scan the array from the tail, because we're likely
1743  * aborting a change for the most recent subtransactions.
1744  *
1745  * We can't use the binary search here as subxact XIDs won't
1746  * necessarily arrive in sorted order, consider the case where we have
1747  * released the savepoint for multiple subtransactions and then
1748  * performed rollback to savepoint for one of the earlier
1749  * sub-transaction.
1750  */
1751  int64 i;
1752  int64 subidx;
1753  BufFile *fd;
1754  bool found = false;
1755  char path[MAXPGPATH];
1756 
1757  subidx = -1;
1760 
1761  for (i = subxact_data.nsubxacts; i > 0; i--)
1762  {
1763  if (subxact_data.subxacts[i - 1].xid == subxid)
1764  {
1765  subidx = (i - 1);
1766  found = true;
1767  break;
1768  }
1769  }
1770 
1771  /*
1772  * If it's an empty sub-transaction then we will not find the subxid
1773  * here so just cleanup the subxact info and return.
1774  */
1775  if (!found)
1776  {
1777  /* Cleanup the subxact info */
1781  return;
1782  }
1783 
1784  /* open the changes file */
1787  O_RDWR, false);
1788 
1789  /* OK, truncate the file at the right offset */
1791  subxact_data.subxacts[subidx].offset);
1792  BufFileClose(fd);
1793 
1794  /* discard the subxacts added later */
1795  subxact_data.nsubxacts = subidx;
1796 
1797  /* write the updated subxact list */
1799 
1802  }
1803 }
static void cleanup_subxact_info(void)
Definition: worker.c:4385
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4003
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4052
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
off_t offset
Definition: worker.c:345
TransactionId xid
Definition: worker.c:343
int fileno
Definition: worker.c:344

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), fd(), SubXactInfo::fileno, i, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_handle_stream_abort().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 4202 of file worker.c.

4203 {
4204  char path[MAXPGPATH];
4205 
4206  /* Delete the changes file. */
4207  changes_filename(path, subid, xid);
4209 
4210  /* Delete the subxact file, if it exists. */
4211  subxact_filename(path, subid, xid);
4213 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4181
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_close_file()

static void stream_close_file ( void  )
static

Definition at line 4271 of file worker.c.

4272 {
4273  Assert(stream_fd != NULL);
4274 
4276 
4277  stream_fd = NULL;
4278 }

References Assert, BufFileClose(), and stream_fd.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_spooled_messages(), and stream_stop_internal().

◆ stream_open_and_write_change()

static void stream_open_and_write_change ( TransactionId  xid,
char  action,
StringInfo  s 
)
static

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 4226 of file worker.c.

4227 {
4228  char path[MAXPGPATH];
4229  MemoryContext oldcxt;
4230 
4231  Assert(OidIsValid(subid));
4233  Assert(stream_fd == NULL);
4234 
4235 
4236  changes_filename(path, subid, xid);
4237  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4238 
4239  /*
4240  * Create/open the buffiles under the logical streaming context so that we
4241  * have those files until stream stop.
4242  */
4244 
4245  /*
4246  * If this is the first streamed segment, create the changes file.
4247  * Otherwise, just open the file for writing, in append mode.
4248  */
4249  if (first_segment)
4251  path);
4252  else
4253  {
4254  /*
4255  * Open the file and seek to the end of the file because we always
4256  * append the changes file.
4257  */
4259  path, O_RDWR, false);
4260  BufFileSeek(stream_fd, 0, 0, SEEK_END);
4261  }
4262 
4263  MemoryContextSwitchTo(oldcxt);
4264 }
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267

References Assert, BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by stream_start_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1426 of file worker.c.

1427 {
1429 
1430  /*
1431  * Initialize the worker's stream_fileset if we haven't yet. This will be
1432  * used for the entire duration of the worker so create it in a permanent
1433  * context. We create this on the very first streaming message from any
1434  * transaction and then use it for this and other streaming transactions.
1435  * Now, we could create a fileset at the start of the worker as well but
1436  * then we won't be sure that it will ever be used.
1437  */
1439  {
1440  MemoryContext oldctx;
1441 
1443 
1446 
1447  MemoryContextSwitchTo(oldctx);
1448  }
1449 
1450  /* Open the spool file for this transaction. */
1451  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1452 
1453  /* If this is not the first segment, open existing subxact file. */
1454  if (!first_segment)
1456 
1458 }
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4226
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1600 of file worker.c.

1601 {
1602  /*
1603  * Serialize information about subxacts for the toplevel transaction, then
1604  * close the stream messages spool file.
1605  */
1608 
1609  /* We must be in a valid transaction state */
1611 
1612  /* Commit the per-stream transaction */
1614 
1615  /* Reset per-stream context */
1617 }

References Assert, CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 4289 of file worker.c.

4290 {
4291  int len;
4292 
4293  Assert(stream_fd != NULL);
4294 
4295  /* total on-disk size, including the action type character */
4296  len = (s->len - s->cursor) + sizeof(char);
4297 
4298  /* first write the size */
4299  BufFileWrite(stream_fd, &len, sizeof(len));
4300 
4301  /* then the action */
4302  BufFileWrite(stream_fd, &action, sizeof(action));
4303 
4304  /* and finally the remaining part of the buffer (after the XID) */
4305  len = (s->len - s->cursor);
4306 
4307  BufFileWrite(stream_fd, &s->data[s->cursor], len);
4308 }
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676

References generate_unaccent_rules::action, Assert, BufFileWrite(), StringInfoData::cursor, StringInfoData::data, StringInfoData::len, len, and stream_fd.

Referenced by apply_handle_stream_start(), apply_handle_stream_stop(), handle_streamed_transaction(), and stream_open_and_write_change().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 3988 of file worker.c.

3989 {
3990  MySubscriptionValid = false;
3991 }

References MySubscriptionValid.

Referenced by InitializeLogRepWorker().

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4181 of file worker.c.

4182 {
4183  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4184 }

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 4103 of file worker.c.

4104 {
4105  SubXactInfo *subxacts = subxact_data.subxacts;
4106  int64 i;
4107 
4108  /* We must have a valid top level stream xid and a stream fd. */
4110  Assert(stream_fd != NULL);
4111 
4112  /*
4113  * If the XID matches the toplevel transaction, we don't want to add it.
4114  */
4115  if (stream_xid == xid)
4116  return;
4117 
4118  /*
4119  * In most cases we're checking the same subxact as we've already seen in
4120  * the last call, so make sure to ignore it (this change comes later).
4121  */
4122  if (subxact_data.subxact_last == xid)
4123  return;
4124 
4125  /* OK, remember we're processing this XID. */
4126  subxact_data.subxact_last = xid;
4127 
4128  /*
4129  * Check if the transaction is already present in the array of subxact. We
4130  * intentionally scan the array from the tail, because we're likely adding
4131  * a change for the most recent subtransactions.
4132  *
4133  * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4134  * would allow us to use binary search here.
4135  */
4136  for (i = subxact_data.nsubxacts; i > 0; i--)
4137  {
4138  /* found, so we're done */
4139  if (subxacts[i - 1].xid == xid)
4140  return;
4141  }
4142 
4143  /* This is a new subxact, so we need to add it to the array. */
4144  if (subxact_data.nsubxacts == 0)
4145  {
4146  MemoryContext oldctx;
4147 
4149 
4150  /*
4151  * Allocate this memory for subxacts in per-stream context, see
4152  * subxact_info_read.
4153  */
4155  subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4156  MemoryContextSwitchTo(oldctx);
4157  }
4159  {
4161  subxacts = repalloc(subxacts,
4163  }
4164 
4165  subxacts[subxact_data.nsubxacts].xid = xid;
4166 
4167  /*
4168  * Get the current offset of the stream file and store it as offset of
4169  * this subxact.
4170  */
4172  &subxacts[subxact_data.nsubxacts].fileno,
4173  &subxacts[subxact_data.nsubxacts].offset);
4174 
4176  subxact_data.subxacts = subxacts;
4177 }

References Assert, BufFileTell(), SubXactInfo::fileno, i, LogicalStreamingContext, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc(), repalloc(), stream_fd, stream_xid, subxact_data, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 4052 of file worker.c.

4053 {
4054  char path[MAXPGPATH];
4055  Size len;
4056  BufFile *fd;
4057  MemoryContext oldctx;
4058 
4062 
4063  /*
4064  * If the subxact file doesn't exist that means we don't have any subxact
4065  * info.
4066  */
4067  subxact_filename(path, subid, xid);
4069  true);
4070  if (fd == NULL)
4071  return;
4072 
4073  /* read number of subxact items */
4075 
4076  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4077 
4078  /* we keep the maximum as a power of 2 */
4080 
4081  /*
4082  * Allocate subxact information in the logical streaming context. We need
4083  * this information during the complete stream so that we can add the sub
4084  * transaction info to this. On stream stop we will flush this information
4085  * to the subxact file and reset the logical streaming context.
4086  */
4089  sizeof(SubXactInfo));
4090  MemoryContextSwitchTo(oldctx);
4091 
4092  if (len > 0)
4094 
4095  BufFileClose(fd);
4096 }
struct SubXactInfo SubXactInfo
size_t Size
Definition: c.h:605
int my_log2(long num)
Definition: dynahash.c:1751

References Assert, BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), fd(), len, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), my_log2(), MyLogicalRepWorker, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc(), LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and stream_start_internal().

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 4003 of file worker.c.

4004 {
4005  char path[MAXPGPATH];
4006  Size len;
4007  BufFile *fd;
4008 
4010 
4011  /* construct the subxact filename */
4012  subxact_filename(path, subid, xid);
4013 
4014  /* Delete the subxacts file, if exists. */
4015  if (subxact_data.nsubxacts == 0)
4016  {
4019 
4020  return;
4021  }
4022 
4023  /*
4024  * Create the subxact file if it not already created, otherwise open the
4025  * existing file.
4026  */
4028  true);
4029  if (fd == NULL)
4031 
4032  len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4033 
4034  /* Write the subxact count and subxact info */
4037 
4038  BufFileClose(fd);
4039 
4040  /* free the memory allocated for subxact info */
4042 }

References Assert, BufFileClose(), BufFileCreateFileSet(), BufFileDeleteFileSet(), BufFileOpenFileSet(), BufFileWrite(), cleanup_subxact_info(), fd(), len, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by stream_abort_internal(), and stream_stop_internal().

◆ TargetPrivilegesCheck()

static void TargetPrivilegesCheck ( Relation  rel,
AclMode  mode 
)
static

Definition at line 2336 of file worker.c.

2337 {
2338  Oid relid;
2339  AclResult aclresult;
2340 
2341  relid = RelationGetRelid(rel);
2342  aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2343  if (aclresult != ACLCHECK_OK)
2344  aclcheck_error(aclresult,
2345  get_relkind_objtype(rel->rd_rel->relkind),
2346  get_rel_name(relid));
2347 
2348  /*
2349  * We lack the infrastructure to honor RLS policies. It might be possible
2350  * to add such infrastructure here, but tablesync workers lack it, too, so
2351  * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2352  * but it seems dangerous to replicate a TRUNCATE and then refuse to
2353  * replicate subsequent INSERTs, so we forbid all commands the same.
2354  */
2355  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2356  ereport(ERROR,
2357  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2358  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2359  GetUserNameFromId(GetUserId(), true),
2360  RelationGetRelationName(rel))));
2361 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2700
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4091
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:980
Oid GetUserId(void)
Definition: miscinit.c:514
ObjectType get_relkind_objtype(char relkind)
static PgChecksumMode mode
Definition: pg_checksums.c:56
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45

References aclcheck_error(), ACLCHECK_OK, check_enable_rls(), ereport, errcode(), errmsg(), ERROR, get_rel_name(), get_relkind_objtype(), GetUserId(), GetUserNameFromId(), InvalidOid, mode, pg_class_aclcheck(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, and RLS_ENABLED.

Referenced by apply_handle_delete_internal(), apply_handle_insert_internal(), apply_handle_truncate(), apply_handle_tuple_routing(), apply_handle_update_internal(), and FindReplTupleInLocalRel().

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

Variable Documentation

◆ apply_error_callback_arg

ApplyErrorCallbackArg apply_error_callback_arg
static
Initial value:
=
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}

Definition at line 278 of file worker.c.

Referenced by apply_dispatch(), apply_error_callback(), apply_handle_delete(), apply_handle_insert(), apply_handle_update(), reset_apply_error_context_info(), set_apply_error_context_origin(), set_apply_error_context_xact(), slot_modify_data(), and slot_store_data().

◆ apply_error_context_stack

ErrorContextCallback* apply_error_context_stack = NULL

Definition at line 288 of file worker.c.

Referenced by HandleParallelApplyMessage(), and LogicalRepApplyLoop().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ in_streamed_transaction

◆ InitializingApplyWorker

bool InitializingApplyWorker = false

Definition at line 318 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogicalStreamingContext

MemoryContext LogicalStreamingContext = NULL
static

◆ LogRepWorkerWalRcvConn

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 204 of file worker.c.

Referenced by get_flush_position(), LogicalRepApplyLoop(), and store_flush_position().

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false
static

◆ on_commit_wakeup_workers_subids

List* on_commit_wakeup_workers_subids = NIL
static

Definition at line 301 of file worker.c.

Referenced by AtEOXact_LogicalRepWorkers(), and LogicalRepWorkersWakeupAtCommit().

◆ parallel_stream_nchanges

uint32 parallel_stream_nchanges = 0
static

◆ remote_final_lsn

◆ skip_xact_finish_lsn

XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
static

Definition at line 335 of file worker.c.

Referenced by maybe_start_skipping_changes(), and stop_skipping_changes().

◆ stream_fd

◆ stream_xid

◆ subxact_data