PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void apply_worker_exit (void)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker ()
 
void InitializeLogRepWorker (void)
 
static void replorigin_reset (int code, Datum arg)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 337 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 196 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 266 of file worker.c.

267 {
268  /* The action for non-streaming transactions. */
270 
271  /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:267
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:272
@ TRANS_PARALLEL_APPLY
Definition: worker.c:275
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:273
@ TRANS_LEADER_APPLY
Definition: worker.c:269
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:274

Function Documentation

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3364 of file worker.c.

3365 {
3367  LogicalRepMsgType saved_command;
3368 
3369  /*
3370  * Set the current command being applied. Since this function can be
3371  * called recursively when applying spooled changes, save the current
3372  * command.
3373  */
3374  saved_command = apply_error_callback_arg.command;
3376 
3377  switch (action)
3378  {
3379  case LOGICAL_REP_MSG_BEGIN:
3380  apply_handle_begin(s);
3381  break;
3382 
3385  break;
3386 
3389  break;
3390 
3393  break;
3394 
3397  break;
3398 
3401  break;
3402 
3405  break;
3406 
3407  case LOGICAL_REP_MSG_TYPE:
3408  apply_handle_type(s);
3409  break;
3410 
3413  break;
3414 
3416 
3417  /*
3418  * Logical replication does not use generic logical messages yet.
3419  * Although, it could be used by other applications that use this
3420  * output plugin.
3421  */
3422  break;
3423 
3426  break;
3427 
3430  break;
3431 
3434  break;
3435 
3438  break;
3439 
3442  break;
3443 
3446  break;
3447 
3450  break;
3451 
3454  break;
3455 
3458  break;
3459 
3460  default:
3461  ereport(ERROR,
3462  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3463  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3464  }
3465 
3466  /* Reset the current command */
3467  apply_error_callback_arg.command = saved_command;
3468 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1284
static void apply_handle_type(StringInfo s)
Definition: worker.c:2345
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3236
static void apply_handle_update(StringInfo s)
Definition: worker.c:2542
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2152
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1177
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:279
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2752
static void apply_handle_begin(StringInfo s)
Definition: worker.c:989
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1014
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1833
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2322
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1106
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1226
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1647
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1429
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1040
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1488
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2392
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:222

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4992 of file worker.c.

4993 {
4995  int elevel;
4996 
4998  return;
4999 
5000  Assert(errarg->origin_name);
5001 
5002  elevel = geterrlevel();
5003 
5004  /*
5005  * Reset the origin state to prevent the advancement of origin progress if
5006  * we fail to apply. Otherwise, this will result in transaction loss as
5007  * that transaction won't be sent again by the server.
5008  */
5009  if (elevel >= ERROR)
5010  replorigin_reset(0, (Datum) 0);
5011 
5012  if (errarg->rel == NULL)
5013  {
5014  if (!TransactionIdIsValid(errarg->remote_xid))
5015  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5016  errarg->origin_name,
5017  logicalrep_message_type(errarg->command));
5018  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5019  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5020  errarg->origin_name,
5022  errarg->remote_xid);
5023  else
5024  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5025  errarg->origin_name,
5027  errarg->remote_xid,
5028  LSN_FORMAT_ARGS(errarg->finish_lsn));
5029  }
5030  else
5031  {
5032  if (errarg->remote_attnum < 0)
5033  {
5034  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5035  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5036  errarg->origin_name,
5038  errarg->rel->remoterel.nspname,
5039  errarg->rel->remoterel.relname,
5040  errarg->remote_xid);
5041  else
5042  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5043  errarg->origin_name,
5045  errarg->rel->remoterel.nspname,
5046  errarg->rel->remoterel.relname,
5047  errarg->remote_xid,
5048  LSN_FORMAT_ARGS(errarg->finish_lsn));
5049  }
5050  else
5051  {
5052  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5053  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5054  errarg->origin_name,
5056  errarg->rel->remoterel.nspname,
5057  errarg->rel->remoterel.relname,
5058  errarg->rel->remoterel.attnames[errarg->remote_attnum],
5059  errarg->remote_xid);
5060  else
5061  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5062  errarg->origin_name,
5064  errarg->rel->remoterel.nspname,
5065  errarg->rel->remoterel.relname,
5066  errarg->rel->remoterel.attnames[errarg->remote_attnum],
5067  errarg->remote_xid,
5068  LSN_FORMAT_ARGS(errarg->finish_lsn));
5069  }
5070  }
5071 }
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:4721
#define Assert(condition)
Definition: c.h:858
int geterrlevel(void)
Definition: elog.c:1578
#define errcontext
Definition: elog.h:196
uintptr_t Datum
Definition: postgres.h:64
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:227
XLogRecPtr finish_lsn
Definition: worker.c:228
LogicalRepRelMapEntry * rel
Definition: worker.c:223
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert, LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ERROR, ApplyErrorCallbackArg::finish_lsn, geterrlevel(), logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, replorigin_reset(), TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 989 of file worker.c.

990 {
991  LogicalRepBeginData begin_data;
992 
993  /* There must not be an active streaming transaction. */
995 
996  logicalrep_read_begin(s, &begin_data);
997  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
998 
999  remote_final_lsn = begin_data.final_lsn;
1000 
1002 
1003  in_remote_transaction = true;
1004 
1006 }
bool in_remote_transaction
Definition: worker.c:304
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:5075
static XLogRecPtr remote_final_lsn
Definition: worker.c:305
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4862
static TransactionId stream_xid
Definition: worker.c:310
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert, LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1040 of file worker.c.

1041 {
1042  LogicalRepPreparedTxnData begin_data;
1043 
1044  /* Tablesync should never receive prepare. */
1045  if (am_tablesync_worker())
1046  ereport(ERROR,
1047  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1048  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1049 
1050  /* There must not be an active streaming transaction. */
1052 
1053  logicalrep_read_begin_prepare(s, &begin_data);
1054  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1055 
1056  remote_final_lsn = begin_data.prepare_lsn;
1057 
1059 
1060  in_remote_transaction = true;
1061 
1063 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert, ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1014 of file worker.c.

1015 {
1016  LogicalRepCommitData commit_data;
1017 
1018  logicalrep_read_commit(s, &commit_data);
1019 
1020  if (commit_data.commit_lsn != remote_final_lsn)
1021  ereport(ERROR,
1022  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1023  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1024  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1026 
1027  apply_handle_commit_internal(&commit_data);
1028 
1029  /* Process any tables that are being synchronized in parallel. */
1030  process_syncing_tables(commit_data.end_lsn);
1031 
1034 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2262
static void reset_apply_error_context_info(void)
Definition: worker.c:5083
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2262 of file worker.c.

2263 {
2264  if (is_skipping_changes())
2265  {
2267 
2268  /*
2269  * Start a new transaction to clear the subskiplsn, if not started
2270  * yet.
2271  */
2272  if (!IsTransactionState())
2274  }
2275 
2276  if (IsTransactionState())
2277  {
2278  /*
2279  * The transaction is either non-empty or skipped, so we clear the
2280  * subskiplsn.
2281  */
2283 
2284  /*
2285  * Update origin state so we can restart streaming from correct
2286  * position in case of crash.
2287  */
2288  replorigin_session_origin_lsn = commit_data->end_lsn;
2290 
2292 
2293  if (IsTransactionBlock())
2294  {
2295  EndTransactionBlock(false);
2297  }
2298 
2299  pgstat_report_stat(false);
2300 
2302  }
2303  else
2304  {
2305  /* Process any invalidation messages that might have accumulated. */
2308  }
2309 
2310  in_remote_transaction = false;
2311 }
static void stop_skipping_changes(void)
Definition: worker.c:4889
#define is_skipping_changes()
Definition: worker.c:337
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4911
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3528
void maybe_reread_subscription(void)
Definition: worker.c:3954
void AcceptInvalidationMessages(void)
Definition: inval.c:806
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:161
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:160
long pgstat_report_stat(bool force)
Definition: pgstat.c:660
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:386
void StartTransactionCommand(void)
Definition: xact.c:3039
bool IsTransactionBlock(void)
Definition: xact.c:4964
void CommitTransactionCommand(void)
Definition: xact.c:3137
bool EndTransactionBlock(bool chain)
Definition: xact.c:4037
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:254

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1177 of file worker.c.

1178 {
1179  LogicalRepCommitPreparedTxnData prepare_data;
1180  char gid[GIDSIZE];
1181 
1182  logicalrep_read_commit_prepared(s, &prepare_data);
1183  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1184 
1185  /* Compute GID for two_phase transactions. */
1187  gid, sizeof(gid));
1188 
1189  /* There is no transaction when COMMIT PREPARED is called */
1191 
1192  /*
1193  * Update origin state so we can restart streaming from correct position
1194  * in case of crash.
1195  */
1196  replorigin_session_origin_lsn = prepare_data.end_lsn;
1198 
1199  FinishPreparedTransaction(gid, true);
1202  pgstat_report_stat(false);
1203 
1205  in_remote_transaction = false;
1206 
1207  /* Process any tables that are being synchronized in parallel. */
1208  process_syncing_tables(prepare_data.end_lsn);
1209 
1210  clear_subscription_skip_lsn(prepare_data.end_lsn);
1211 
1214 }
static void begin_replication_step(void)
Definition: worker.c:506
static void end_replication_step(void)
Definition: worker.c:529
Subscription * MySubscription
Definition: worker.c:299
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2692
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1503
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2752 of file worker.c.

2753 {
2754  LogicalRepRelMapEntry *rel;
2755  LogicalRepTupleData oldtup;
2756  LogicalRepRelId relid;
2757  UserContext ucxt;
2758  ApplyExecutionData *edata;
2759  EState *estate;
2760  TupleTableSlot *remoteslot;
2761  MemoryContext oldctx;
2762  bool run_as_owner;
2763 
2764  /*
2765  * Quick return if we are skipping data modification changes or handling
2766  * streamed transactions.
2767  */
2768  if (is_skipping_changes() ||
2770  return;
2771 
2773 
2774  relid = logicalrep_read_delete(s, &oldtup);
2775  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2776  if (!should_apply_changes_for_rel(rel))
2777  {
2778  /*
2779  * The relation can't become interesting in the middle of the
2780  * transaction so it's safe to unlock it.
2781  */
2784  return;
2785  }
2786 
2787  /* Set relation for error callback */
2789 
2790  /* Check if we can do the delete. */
2792 
2793  /*
2794  * Make sure that any user-supplied code runs as the table owner, unless
2795  * the user has opted out of that behavior.
2796  */
2797  run_as_owner = MySubscription->runasowner;
2798  if (!run_as_owner)
2799  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2800 
2801  /* Initialize the executor state. */
2802  edata = create_edata_for_relation(rel);
2803  estate = edata->estate;
2804  remoteslot = ExecInitExtraTupleSlot(estate,
2805  RelationGetDescr(rel->localrel),
2806  &TTSOpsVirtual);
2807 
2808  /* Build the search tuple. */
2810  slot_store_data(remoteslot, rel, &oldtup);
2811  MemoryContextSwitchTo(oldctx);
2812 
2813  /* For a partitioned table, apply delete to correct partition. */
2814  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2816  remoteslot, NULL, CMD_DELETE);
2817  else
2819  remoteslot, rel->localindexoid);
2820 
2821  finish_edata(edata);
2822 
2823  /* Reset relation for error callback */
2825 
2826  if (!run_as_owner)
2827  RestoreUserContext(&ucxt);
2828 
2830 
2832 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2501
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:650
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:466
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:557
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2954
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:795
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:707
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2840
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1918
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:566
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:268
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
Definition: rel.h:531
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:473
ResultRelInfo * targetRelInfo
Definition: worker.c:212
EState * estate
Definition: worker.c:209
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 2840 of file worker.c.

2844 {
2845  EState *estate = edata->estate;
2846  Relation localrel = relinfo->ri_RelationDesc;
2847  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2848  EPQState epqstate;
2849  TupleTableSlot *localslot;
2850  bool found;
2851 
2852  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2853  ExecOpenIndices(relinfo, false);
2854 
2855  found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2856  remoteslot, &localslot);
2857 
2858  /* If found delete it. */
2859  if (found)
2860  {
2861  RepOriginId localorigin;
2862  TransactionId localxmin;
2863  TimestampTz localts;
2864 
2865  /*
2866  * Report the conflict if the tuple was modified by a different
2867  * origin.
2868  */
2869  if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2870  localorigin != replorigin_session_origin)
2872  remoteslot, localslot, NULL,
2873  InvalidOid, localxmin, localorigin, localts);
2874 
2875  EvalPlanQualSetSlot(&epqstate, localslot);
2876 
2877  /* Do the actual delete. */
2879  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2880  }
2881  else
2882  {
2883  /*
2884  * The tuple to be deleted could not be found. Do nothing except for
2885  * emitting a log message.
2886  */
2887  ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2888  remoteslot, NULL, NULL,
2890  InvalidRepOriginId, 0);
2891  }
2892 
2893  /* Cleanup. */
2894  ExecCloseIndices(relinfo);
2895  EvalPlanQualEnd(&epqstate);
2896 }
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2906
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2360
uint32 TransactionId
Definition: c.h:652
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts)
Definition: conflict.c:107
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:61
@ CT_DELETE_MISSING
Definition: conflict.h:42
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:39
int64 TimestampTz
Definition: timestamp.h:39
#define LOG
Definition: elog.h:31
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:236
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2534
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2977
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:248
RepOriginId replorigin_session_origin
Definition: origin.c:159
#define InvalidRepOriginId
Definition: origin.h:33
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define InvalidOid
Definition: postgres_ext.h:36
LogicalRepRelMapEntry * targetRel
Definition: worker.c:211
Relation ri_RelationDesc
Definition: execnodes.h:459
#define InvalidTransactionId
Definition: transam.h:31
uint16 RepOriginId
Definition: xlogdefs.h:65

References ACL_DELETE, CT_DELETE_MISSING, CT_DELETE_ORIGIN_DIFFERS, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), GetTupleTransactionInfo(), InvalidOid, InvalidRepOriginId, InvalidTransactionId, LOG, NIL, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2392 of file worker.c.

2393 {
2394  LogicalRepRelMapEntry *rel;
2395  LogicalRepTupleData newtup;
2396  LogicalRepRelId relid;
2397  UserContext ucxt;
2398  ApplyExecutionData *edata;
2399  EState *estate;
2400  TupleTableSlot *remoteslot;
2401  MemoryContext oldctx;
2402  bool run_as_owner;
2403 
2404  /*
2405  * Quick return if we are skipping data modification changes or handling
2406  * streamed transactions.
2407  */
2408  if (is_skipping_changes() ||
2410  return;
2411 
2413 
2414  relid = logicalrep_read_insert(s, &newtup);
2415  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2416  if (!should_apply_changes_for_rel(rel))
2417  {
2418  /*
2419  * The relation can't become interesting in the middle of the
2420  * transaction so it's safe to unlock it.
2421  */
2424  return;
2425  }
2426 
2427  /*
2428  * Make sure that any user-supplied code runs as the table owner, unless
2429  * the user has opted out of that behavior.
2430  */
2431  run_as_owner = MySubscription->runasowner;
2432  if (!run_as_owner)
2433  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2434 
2435  /* Set relation for error callback */
2437 
2438  /* Initialize the executor state. */
2439  edata = create_edata_for_relation(rel);
2440  estate = edata->estate;
2441  remoteslot = ExecInitExtraTupleSlot(estate,
2442  RelationGetDescr(rel->localrel),
2443  &TTSOpsVirtual);
2444 
2445  /* Process and store remote tuple in the slot */
2447  slot_store_data(remoteslot, rel, &newtup);
2448  slot_fill_defaults(rel, estate, remoteslot);
2449  MemoryContextSwitchTo(oldctx);
2450 
2451  /* For a partitioned table, insert the tuple into a partition. */
2452  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2454  remoteslot, NULL, CMD_INSERT);
2455  else
2457  remoteslot);
2458 
2459  finish_edata(edata);
2460 
2461  /* Reset relation for error callback */
2463 
2464  if (!run_as_owner)
2465  RestoreUserContext(&ucxt);
2466 
2468 
2470 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2478
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:738
@ CMD_INSERT
Definition: nodes.h:267
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2478 of file worker.c.

2481 {
2482  EState *estate = edata->estate;
2483 
2484  /* We must open indexes here. */
2485  ExecOpenIndices(relinfo, true);
2486  InitConflictIndexes(relinfo);
2487 
2488  /* Do the insert. */
2490  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2491 
2492  /* Cleanup. */
2493  ExecCloseIndices(relinfo);
2494 }
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:136
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), InitConflictIndexes(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1429 of file worker.c.

1430 {
1431  /*
1432  * ORIGIN message can only come inside streaming transaction or inside
1433  * remote transaction and before any actual writes.
1434  */
1435  if (!in_streamed_transaction &&
1438  ereport(ERROR,
1439  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1440  errmsg_internal("ORIGIN message sent out of order")));
1441 }
static bool in_streamed_transaction
Definition: worker.c:308

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1106 of file worker.c.

1107 {
1108  LogicalRepPreparedTxnData prepare_data;
1109 
1110  logicalrep_read_prepare(s, &prepare_data);
1111 
1112  if (prepare_data.prepare_lsn != remote_final_lsn)
1113  ereport(ERROR,
1114  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1115  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1116  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1118 
1119  /*
1120  * Unlike commit, here, we always prepare the transaction even though no
1121  * change has happened in this transaction or all changes are skipped. It
1122  * is done this way because at commit prepared time, we won't know whether
1123  * we have skipped preparing a transaction because of those reasons.
1124  *
1125  * XXX, We can optimize such that at commit prepared time, we first check
1126  * whether we have prepared the transaction or not but that doesn't seem
1127  * worthwhile because such cases shouldn't be common.
1128  */
1130 
1131  apply_handle_prepare_internal(&prepare_data);
1132 
1135  pgstat_report_stat(false);
1136 
1137  /*
1138  * It is okay not to set the local_end LSN for the prepare because we
1139  * always flush the prepare record. So, we can send the acknowledgment of
1140  * the remote_end LSN as soon as prepare is finished.
1141  *
1142  * XXX For the sake of consistency with commit, we could have set it with
1143  * the LSN of prepare but as of now we don't track that value similar to
1144  * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1145  * it.
1146  */
1148 
1149  in_remote_transaction = false;
1150 
1151  /* Process any tables that are being synchronized in parallel. */
1152  process_syncing_tables(prepare_data.end_lsn);
1153 
1154  /*
1155  * Since we have already prepared the transaction, in a case where the
1156  * server crashes before clearing the subskiplsn, it will be left but the
1157  * transaction won't be resent. But that's okay because it's a rare case
1158  * and the subskiplsn will be cleared when finishing the next transaction.
1159  */
1162 
1165 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1069
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1069 of file worker.c.

1070 {
1071  char gid[GIDSIZE];
1072 
1073  /*
1074  * Compute unique GID for two_phase transactions. We don't use GID of
1075  * prepared transaction sent by server as that can lead to deadlock when
1076  * we have multiple subscriptions from same node point to publications on
1077  * the same node. See comments atop worker.c
1078  */
1079  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1080  gid, sizeof(gid));
1081 
1082  /*
1083  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1084  * called within the PrepareTransactionBlock below.
1085  */
1086  if (!IsTransactionBlock())
1087  {
1089  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1090  }
1091 
1092  /*
1093  * Update origin state so we can restart streaming from correct position
1094  * in case of crash.
1095  */
1096  replorigin_session_origin_lsn = prepare_data->end_lsn;
1098 
1100 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3985
void BeginTransactionBlock(void)
Definition: xact.c:3917

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2322 of file worker.c.

2323 {
2324  LogicalRepRelation *rel;
2325 
2327  return;
2328 
2329  rel = logicalrep_read_rel(s);
2331 
2332  /* Also reset all entries in the partition map that refer to remoterel. */
2334 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:540
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1226 of file worker.c.

1227 {
1228  LogicalRepRollbackPreparedTxnData rollback_data;
1229  char gid[GIDSIZE];
1230 
1231  logicalrep_read_rollback_prepared(s, &rollback_data);
1232  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1233 
1234  /* Compute GID for two_phase transactions. */
1235  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1236  gid, sizeof(gid));
1237 
1238  /*
1239  * It is possible that we haven't received prepare because it occurred
1240  * before walsender reached a consistent point or the two_phase was still
1241  * not enabled by that time, so in such cases, we need to skip rollback
1242  * prepared.
1243  */
1244  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1245  rollback_data.prepare_time))
1246  {
1247  /*
1248  * Update origin state so we can restart streaming from correct
1249  * position in case of crash.
1250  */
1253 
1254  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1256  FinishPreparedTransaction(gid, false);
1259 
1261  }
1262 
1263  pgstat_report_stat(false);
1264 
1265  /*
1266  * It is okay not to set the local_end LSN for the rollback of prepared
1267  * transaction because we always flush the WAL record for it. See
1268  * apply_handle_prepare.
1269  */
1271  in_remote_transaction = false;
1272 
1273  /* Process any tables that are being synchronized in parallel. */
1275 
1278 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2633

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1833 of file worker.c.

1834 {
1835  TransactionId xid;
1836  TransactionId subxid;
1837  LogicalRepStreamAbortData abort_data;
1838  ParallelApplyWorkerInfo *winfo;
1839  TransApplyAction apply_action;
1840 
1841  /* Save the message before it is consumed. */
1842  StringInfoData original_msg = *s;
1843  bool toplevel_xact;
1844 
1846  ereport(ERROR,
1847  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1848  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1849 
1850  /* We receive abort information only when we can apply in parallel. */
1851  logicalrep_read_stream_abort(s, &abort_data,
1853 
1854  xid = abort_data.xid;
1855  subxid = abort_data.subxid;
1856  toplevel_xact = (xid == subxid);
1857 
1858  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1859 
1860  apply_action = get_transaction_apply_action(xid, &winfo);
1861 
1862  switch (apply_action)
1863  {
1864  case TRANS_LEADER_APPLY:
1865 
1866  /*
1867  * We are in the leader apply worker and the transaction has been
1868  * serialized to file.
1869  */
1870  stream_abort_internal(xid, subxid);
1871 
1872  elog(DEBUG1, "finished processing the STREAM ABORT command");
1873  break;
1874 
1876  Assert(winfo);
1877 
1878  /*
1879  * For the case of aborting the subtransaction, we increment the
1880  * number of streaming blocks and take the lock again before
1881  * sending the STREAM_ABORT to ensure that the parallel apply
1882  * worker will wait on the lock for the next set of changes after
1883  * processing the STREAM_ABORT message if it is not already
1884  * waiting for STREAM_STOP message.
1885  *
1886  * It is important to perform this locking before sending the
1887  * STREAM_ABORT message so that the leader can hold the lock first
1888  * and the parallel apply worker will wait for the leader to
1889  * release the lock. This is the same as what we do in
1890  * apply_handle_stream_stop. See Locking Considerations atop
1891  * applyparallelworker.c.
1892  */
1893  if (!toplevel_xact)
1894  {
1898  }
1899 
1900  if (pa_send_data(winfo, s->len, s->data))
1901  {
1902  /*
1903  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1904  * wait here for the parallel apply worker to finish as that
1905  * is not required to maintain the commit order and won't have
1906  * the risk of failures due to transaction dependencies and
1907  * deadlocks. However, it is possible that before the parallel
1908  * worker finishes and we clear the worker info, the xid
1909  * wraparound happens on the upstream and a new transaction
1910  * with the same xid can appear and that can lead to duplicate
1911  * entries in ParallelApplyTxnHash. Yet another problem could
1912  * be that we may have serialized the changes in partial
1913  * serialize mode and the file containing xact changes may
1914  * already exist, and after xid wraparound trying to create
1915  * the file for the same xid can lead to an error. To avoid
1916  * these problems, we decide to wait for the aborts to finish.
1917  *
1918  * Note, it is okay to not update the flush location position
1919  * for aborts as in worst case that means such a transaction
1920  * won't be sent again after restart.
1921  */
1922  if (toplevel_xact)
1924 
1925  break;
1926  }
1927 
1928  /*
1929  * Switch to serialize mode when we are not able to send the
1930  * change to parallel apply worker.
1931  */
1932  pa_switch_to_partial_serialize(winfo, true);
1933 
1934  /* fall through */
1936  Assert(winfo);
1937 
1938  /*
1939  * Parallel apply worker might have applied some changes, so write
1940  * the STREAM_ABORT message so that it can rollback the
1941  * subtransaction if needed.
1942  */
1944  &original_msg);
1945 
1946  if (toplevel_xact)
1947  {
1950  }
1951  break;
1952 
1953  case TRANS_PARALLEL_APPLY:
1954 
1955  /*
1956  * If the parallel apply worker is applying spooled messages then
1957  * close the file before aborting.
1958  */
1959  if (toplevel_xact && stream_fd)
1961 
1962  pa_stream_abort(&abort_data);
1963 
1964  /*
1965  * We need to wait after processing rollback to savepoint for the
1966  * next set of changes.
1967  *
1968  * We have a race condition here due to which we can start waiting
1969  * here when there are more chunk of streams in the queue. See
1970  * apply_handle_stream_stop.
1971  */
1972  if (!toplevel_xact)
1974 
1975  elog(DEBUG1, "finished processing the STREAM ABORT command");
1976  break;
1977 
1978  default:
1979  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1980  break;
1981  }
1982 
1984 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:424
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5160
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4414
static BufFile * stream_fd
Definition: worker.c:340
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1750
static void stream_close_file(void)
Definition: worker.c:4366
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2152 of file worker.c.

2153 {
2154  TransactionId xid;
2155  LogicalRepCommitData commit_data;
2156  ParallelApplyWorkerInfo *winfo;
2157  TransApplyAction apply_action;
2158 
2159  /* Save the message before it is consumed. */
2160  StringInfoData original_msg = *s;
2161 
2163  ereport(ERROR,
2164  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2165  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2166 
2167  xid = logicalrep_read_stream_commit(s, &commit_data);
2168  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2169 
2170  apply_action = get_transaction_apply_action(xid, &winfo);
2171 
2172  switch (apply_action)
2173  {
2174  case TRANS_LEADER_APPLY:
2175 
2176  /*
2177  * The transaction has been serialized to file, so replay all the
2178  * spooled operations.
2179  */
2181  commit_data.commit_lsn);
2182 
2183  apply_handle_commit_internal(&commit_data);
2184 
2185  /* Unlink the files with serialized changes and subxact info. */
2187 
2188  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2189  break;
2190 
2192  Assert(winfo);
2193 
2194  if (pa_send_data(winfo, s->len, s->data))
2195  {
2196  /* Finish processing the streaming transaction. */
2197  pa_xact_finish(winfo, commit_data.end_lsn);
2198  break;
2199  }
2200 
2201  /*
2202  * Switch to serialize mode when we are not able to send the
2203  * change to parallel apply worker.
2204  */
2205  pa_switch_to_partial_serialize(winfo, true);
2206 
2207  /* fall through */
2209  Assert(winfo);
2210 
2212  &original_msg);
2213 
2215 
2216  /* Finish processing the streaming transaction. */
2217  pa_xact_finish(winfo, commit_data.end_lsn);
2218  break;
2219 
2220  case TRANS_PARALLEL_APPLY:
2221 
2222  /*
2223  * If the parallel apply worker is applying spooled messages then
2224  * close the file before committing.
2225  */
2226  if (stream_fd)
2228 
2229  apply_handle_commit_internal(&commit_data);
2230 
2232 
2233  /*
2234  * It is important to set the transaction state as finished before
2235  * releasing the lock. See pa_wait_for_xact_finish.
2236  */
2239 
2241 
2242  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2243  break;
2244 
2245  default:
2246  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2247  break;
2248  }
2249 
2250  /* Process any tables that are being synchronized in parallel. */
2251  process_syncing_tables(commit_data.end_lsn);
2252 
2254 
2256 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4297
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2022
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert, LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1284 of file worker.c.

1285 {
1286  LogicalRepPreparedTxnData prepare_data;
1287  ParallelApplyWorkerInfo *winfo;
1288  TransApplyAction apply_action;
1289 
1290  /* Save the message before it is consumed. */
1291  StringInfoData original_msg = *s;
1292 
1294  ereport(ERROR,
1295  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1296  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1297 
1298  /* Tablesync should never receive prepare. */
1299  if (am_tablesync_worker())
1300  ereport(ERROR,
1301  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1302  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1303 
1304  logicalrep_read_stream_prepare(s, &prepare_data);
1305  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1306 
1307  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1308 
1309  switch (apply_action)
1310  {
1311  case TRANS_LEADER_APPLY:
1312 
1313  /*
1314  * The transaction has been serialized to file, so replay all the
1315  * spooled operations.
1316  */
1318  prepare_data.xid, prepare_data.prepare_lsn);
1319 
1320  /* Mark the transaction as prepared. */
1321  apply_handle_prepare_internal(&prepare_data);
1322 
1324 
1325  /*
1326  * It is okay not to set the local_end LSN for the prepare because
1327  * we always flush the prepare record. See apply_handle_prepare.
1328  */
1330 
1331  in_remote_transaction = false;
1332 
1333  /* Unlink the files with serialized changes and subxact info. */
1335 
1336  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1337  break;
1338 
1340  Assert(winfo);
1341 
1342  if (pa_send_data(winfo, s->len, s->data))
1343  {
1344  /* Finish processing the streaming transaction. */
1345  pa_xact_finish(winfo, prepare_data.end_lsn);
1346  break;
1347  }
1348 
1349  /*
1350  * Switch to serialize mode when we are not able to send the
1351  * change to parallel apply worker.
1352  */
1353  pa_switch_to_partial_serialize(winfo, true);
1354 
1355  /* fall through */
1357  Assert(winfo);
1358 
1359  stream_open_and_write_change(prepare_data.xid,
1361  &original_msg);
1362 
1364 
1365  /* Finish processing the streaming transaction. */
1366  pa_xact_finish(winfo, prepare_data.end_lsn);
1367  break;
1368 
1369  case TRANS_PARALLEL_APPLY:
1370 
1371  /*
1372  * If the parallel apply worker is applying spooled messages then
1373  * close the file before preparing.
1374  */
1375  if (stream_fd)
1377 
1379 
1380  /* Mark the transaction as prepared. */
1381  apply_handle_prepare_internal(&prepare_data);
1382 
1384 
1386 
1387  /*
1388  * It is okay not to set the local_end LSN for the prepare because
1389  * we always flush the prepare record. See apply_handle_prepare.
1390  */
1392 
1395 
1397 
1398  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1399  break;
1400 
1401  default:
1402  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1403  break;
1404  }
1405 
1406  pgstat_report_stat(false);
1407 
1408  /* Process any tables that are being synchronized in parallel. */
1409  process_syncing_tables(prepare_data.end_lsn);
1410 
1411  /*
1412  * Similar to prepare case, the subskiplsn could be left in a case of
1413  * server crash but it's okay. See the comments in apply_handle_prepare().
1414  */
1417 
1419 
1421 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert, begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1488 of file worker.c.

1489 {
1490  bool first_segment;
1491  ParallelApplyWorkerInfo *winfo;
1492  TransApplyAction apply_action;
1493 
1494  /* Save the message before it is consumed. */
1495  StringInfoData original_msg = *s;
1496 
1498  ereport(ERROR,
1499  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1500  errmsg_internal("duplicate STREAM START message")));
1501 
1502  /* There must not be an active streaming transaction. */
1504 
1505  /* notify handle methods we're processing a remote transaction */
1506  in_streamed_transaction = true;
1507 
1508  /* extract XID of the top-level transaction */
1509  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1510 
1512  ereport(ERROR,
1513  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1514  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1515 
1517 
1518  /* Try to allocate a worker for the streaming transaction. */
1519  if (first_segment)
1521 
1522  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1523 
1524  switch (apply_action)
1525  {
1527 
1528  /*
1529  * Function stream_start_internal starts a transaction. This
1530  * transaction will be committed on the stream stop unless it is a
1531  * tablesync worker in which case it will be committed after
1532  * processing all the messages. We need this transaction for
1533  * handling the BufFile, used for serializing the streaming data
1534  * and subxact info.
1535  */
1536  stream_start_internal(stream_xid, first_segment);
1537  break;
1538 
1540  Assert(winfo);
1541 
1542  /*
1543  * Once we start serializing the changes, the parallel apply
1544  * worker will wait for the leader to release the stream lock
1545  * until the end of the transaction. So, we don't need to release
1546  * the lock or increment the stream count in that case.
1547  */
1548  if (pa_send_data(winfo, s->len, s->data))
1549  {
1550  /*
1551  * Unlock the shared object lock so that the parallel apply
1552  * worker can continue to receive changes.
1553  */
1554  if (!first_segment)
1556 
1557  /*
1558  * Increment the number of streaming blocks waiting to be
1559  * processed by parallel apply worker.
1560  */
1562 
1563  /* Cache the parallel apply worker for this transaction. */
1565  break;
1566  }
1567 
1568  /*
1569  * Switch to serialize mode when we are not able to send the
1570  * change to parallel apply worker.
1571  */
1572  pa_switch_to_partial_serialize(winfo, !first_segment);
1573 
1574  /* fall through */
1576  Assert(winfo);
1577 
1578  /*
1579  * Open the spool file unless it was already opened when switching
1580  * to serialize mode. The transaction started in
1581  * stream_start_internal will be committed on the stream stop.
1582  */
1583  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1584  stream_start_internal(stream_xid, first_segment);
1585 
1587 
1588  /* Cache the parallel apply worker for this transaction. */
1590  break;
1591 
1592  case TRANS_PARALLEL_APPLY:
1593  if (first_segment)
1594  {
1595  /* Hold the lock until the end of the transaction. */
1598 
1599  /*
1600  * Signal the leader apply worker, as it may be waiting for
1601  * us.
1602  */
1604  }
1605 
1607  break;
1608 
1609  default:
1610  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1611  break;
1612  }
1613 
1615 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:316
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4384
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1450
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1647 of file worker.c.

1648 {
1649  ParallelApplyWorkerInfo *winfo;
1650  TransApplyAction apply_action;
1651 
1653  ereport(ERROR,
1654  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1655  errmsg_internal("STREAM STOP message without STREAM START")));
1656 
1657  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1658 
1659  switch (apply_action)
1660  {
1663  break;
1664 
1666  Assert(winfo);
1667 
1668  /*
1669  * Lock before sending the STREAM_STOP message so that the leader
1670  * can hold the lock first and the parallel apply worker will wait
1671  * for leader to release the lock. See Locking Considerations atop
1672  * applyparallelworker.c.
1673  */
1675 
1676  if (pa_send_data(winfo, s->len, s->data))
1677  {
1679  break;
1680  }
1681 
1682  /*
1683  * Switch to serialize mode when we are not able to send the
1684  * change to parallel apply worker.
1685  */
1686  pa_switch_to_partial_serialize(winfo, true);
1687 
1688  /* fall through */
1693  break;
1694 
1695  case TRANS_PARALLEL_APPLY:
1696  elog(DEBUG1, "applied %u changes in the streaming chunk",
1698 
1699  /*
1700  * By the time parallel apply worker is processing the changes in
1701  * the current streaming block, the leader apply worker may have
1702  * sent multiple streaming blocks. This can lead to parallel apply
1703  * worker start waiting even when there are more chunk of streams
1704  * in the queue. So, try to lock only if there is no message left
1705  * in the queue. See Locking Considerations atop
1706  * applyparallelworker.c.
1707  *
1708  * Note that here we have a race condition where we can start
1709  * waiting even when there are pending streaming chunks. This can
1710  * happen if the leader sends another streaming block and acquires
1711  * the stream lock again after the parallel apply worker checks
1712  * that there is no pending streaming block and before it actually
1713  * starts waiting on a lock. We can handle this case by not
1714  * allowing the leader to increment the stream block count during
1715  * the time parallel apply worker acquires the lock but it is not
1716  * clear whether that is worth the complexity.
1717  *
1718  * Now, if this missed chunk contains rollback to savepoint, then
1719  * there is a risk of deadlock which probably shouldn't happen
1720  * after restart.
1721  */
1723  break;
1724 
1725  default:
1726  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1727  break;
1728  }
1729 
1730  in_streamed_transaction = false;
1732 
1733  /*
1734  * The parallel apply worker could be in a transaction in which case we
1735  * need to report the state as STATE_IDLEINTRANSACTION.
1736  */
1739  else
1741 
1743 }
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1624
@ STATE_IDLEINTRANSACTION
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4982

References AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3236 of file worker.c.

3237 {
3238  bool cascade = false;
3239  bool restart_seqs = false;
3240  List *remote_relids = NIL;
3241  List *remote_rels = NIL;
3242  List *rels = NIL;
3243  List *part_rels = NIL;
3244  List *relids = NIL;
3245  List *relids_logged = NIL;
3246  ListCell *lc;
3247  LOCKMODE lockmode = AccessExclusiveLock;
3248 
3249  /*
3250  * Quick return if we are skipping data modification changes or handling
3251  * streamed transactions.
3252  */
3253  if (is_skipping_changes() ||
3255  return;
3256 
3258 
3259  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3260 
3261  foreach(lc, remote_relids)
3262  {
3263  LogicalRepRelId relid = lfirst_oid(lc);
3264  LogicalRepRelMapEntry *rel;
3265 
3266  rel = logicalrep_rel_open(relid, lockmode);
3267  if (!should_apply_changes_for_rel(rel))
3268  {
3269  /*
3270  * The relation can't become interesting in the middle of the
3271  * transaction so it's safe to unlock it.
3272  */
3273  logicalrep_rel_close(rel, lockmode);
3274  continue;
3275  }
3276 
3277  remote_rels = lappend(remote_rels, rel);
3279  rels = lappend(rels, rel->localrel);
3280  relids = lappend_oid(relids, rel->localreloid);
3282  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3283 
3284  /*
3285  * Truncate partitions if we got a message to truncate a partitioned
3286  * table.
3287  */
3288  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3289  {
3290  ListCell *child;
3291  List *children = find_all_inheritors(rel->localreloid,
3292  lockmode,
3293  NULL);
3294 
3295  foreach(child, children)
3296  {
3297  Oid childrelid = lfirst_oid(child);
3298  Relation childrel;
3299 
3300  if (list_member_oid(relids, childrelid))
3301  continue;
3302 
3303  /* find_all_inheritors already got lock */
3304  childrel = table_open(childrelid, NoLock);
3305 
3306  /*
3307  * Ignore temp tables of other backends. See similar code in
3308  * ExecuteTruncate().
3309  */
3310  if (RELATION_IS_OTHER_TEMP(childrel))
3311  {
3312  table_close(childrel, lockmode);
3313  continue;
3314  }
3315 
3317  rels = lappend(rels, childrel);
3318  part_rels = lappend(part_rels, childrel);
3319  relids = lappend_oid(relids, childrelid);
3320  /* Log this relation only if needed for logical decoding */
3321  if (RelationIsLogicallyLogged(childrel))
3322  relids_logged = lappend_oid(relids_logged, childrelid);
3323  }
3324  }
3325  }
3326 
3327  /*
3328  * Even if we used CASCADE on the upstream primary we explicitly default
3329  * to replaying changes without further cascading. This might be later
3330  * changeable with a user specified option.
3331  *
3332  * MySubscription->runasowner tells us whether we want to execute
3333  * replication actions as the subscription owner; the last argument to
3334  * TruncateGuts tells it whether we want to switch to the table owner.
3335  * Those are exactly opposite conditions.
3336  */
3337  ExecuteTruncateGuts(rels,
3338  relids,
3339  relids_logged,
3340  DROP_RESTRICT,
3341  restart_seqs,
3343  foreach(lc, remote_rels)
3344  {
3345  LogicalRepRelMapEntry *rel = lfirst(lc);
3346 
3348  }
3349  foreach(lc, part_rels)
3350  {
3351  Relation rel = lfirst(lc);
3352 
3353  table_close(rel, NoLock);
3354  }
3355 
3357 }
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2330
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1886

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2954 of file worker.c.

2958 {
2959  EState *estate = edata->estate;
2960  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2961  ResultRelInfo *relinfo = edata->targetRelInfo;
2962  Relation parentrel = relinfo->ri_RelationDesc;
2963  ModifyTableState *mtstate;
2964  PartitionTupleRouting *proute;
2965  ResultRelInfo *partrelinfo;
2966  Relation partrel;
2967  TupleTableSlot *remoteslot_part;
2968  TupleConversionMap *map;
2969  MemoryContext oldctx;
2970  LogicalRepRelMapEntry *part_entry = NULL;
2971  AttrMap *attrmap = NULL;
2972 
2973  /* ModifyTableState is needed for ExecFindPartition(). */
2974  edata->mtstate = mtstate = makeNode(ModifyTableState);
2975  mtstate->ps.plan = NULL;
2976  mtstate->ps.state = estate;
2977  mtstate->operation = operation;
2978  mtstate->resultRelInfo = relinfo;
2979 
2980  /* ... as is PartitionTupleRouting. */
2981  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2982 
2983  /*
2984  * Find the partition to which the "search tuple" belongs.
2985  */
2986  Assert(remoteslot != NULL);
2988  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2989  remoteslot, estate);
2990  Assert(partrelinfo != NULL);
2991  partrel = partrelinfo->ri_RelationDesc;
2992 
2993  /*
2994  * Check for supported relkind. We need this since partitions might be of
2995  * unsupported relkinds; and the set of partitions can change, so checking
2996  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2997  */
2998  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3000  RelationGetRelationName(partrel));
3001 
3002  /*
3003  * To perform any of the operations below, the tuple must match the
3004  * partition's rowtype. Convert if needed or just copy, using a dedicated
3005  * slot to store the tuple in any case.
3006  */
3007  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3008  if (remoteslot_part == NULL)
3009  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3010  map = ExecGetRootToChildMap(partrelinfo, estate);
3011  if (map != NULL)
3012  {
3013  attrmap = map->attrMap;
3014  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3015  remoteslot_part);
3016  }
3017  else
3018  {
3019  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3020  slot_getallattrs(remoteslot_part);
3021  }
3022  MemoryContextSwitchTo(oldctx);
3023 
3024  /* Check if we can do the update or delete on the leaf partition. */
3025  if (operation == CMD_UPDATE || operation == CMD_DELETE)
3026  {
3027  part_entry = logicalrep_partition_open(relmapentry, partrel,
3028  attrmap);
3029  check_relation_updatable(part_entry);
3030  }
3031 
3032  switch (operation)
3033  {
3034  case CMD_INSERT:
3035  apply_handle_insert_internal(edata, partrelinfo,
3036  remoteslot_part);
3037  break;
3038 
3039  case CMD_DELETE:
3040  apply_handle_delete_internal(edata, partrelinfo,
3041  remoteslot_part,
3042  part_entry->localindexoid);
3043  break;
3044 
3045  case CMD_UPDATE:
3046 
3047  /*
3048  * For UPDATE, depending on whether or not the updated tuple
3049  * satisfies the partition's constraint, perform a simple UPDATE
3050  * of the partition or move the updated tuple into a different
3051  * suitable partition.
3052  */
3053  {
3054  TupleTableSlot *localslot;
3055  ResultRelInfo *partrelinfo_new;
3056  Relation partrel_new;
3057  bool found;
3058  EPQState epqstate;
3059  RepOriginId localorigin;
3060  TransactionId localxmin;
3061  TimestampTz localts;
3062 
3063  /* Get the matching local tuple from the partition. */
3064  found = FindReplTupleInLocalRel(edata, partrel,
3065  &part_entry->remoterel,
3066  part_entry->localindexoid,
3067  remoteslot_part, &localslot);
3068  if (!found)
3069  {
3070  TupleTableSlot *newslot = localslot;
3071 
3072  /* Store the new tuple for conflict reporting */
3073  slot_store_data(newslot, part_entry, newtup);
3074 
3075  /*
3076  * The tuple to be updated could not be found. Do nothing
3077  * except for emitting a log message.
3078  */
3079  ReportApplyConflict(estate, partrelinfo,
3081  remoteslot_part, NULL, newslot,
3083  InvalidRepOriginId, 0);
3084 
3085  return;
3086  }
3087 
3088  /*
3089  * Report the conflict if the tuple was modified by a
3090  * different origin.
3091  */
3092  if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
3093  localorigin != replorigin_session_origin)
3094  {
3095  TupleTableSlot *newslot;
3096 
3097  /* Store the new tuple for conflict reporting */
3098  newslot = table_slot_create(partrel, &estate->es_tupleTable);
3099  slot_store_data(newslot, part_entry, newtup);
3100 
3101  ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3102  remoteslot_part, localslot, newslot,
3103  InvalidOid, localxmin, localorigin,
3104  localts);
3105  }
3106 
3107  /*
3108  * Apply the update to the local tuple, putting the result in
3109  * remoteslot_part.
3110  */
3112  slot_modify_data(remoteslot_part, localslot, part_entry,
3113  newtup);
3114  MemoryContextSwitchTo(oldctx);
3115 
3116  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3117 
3118  /*
3119  * Does the updated tuple still satisfy the current
3120  * partition's constraint?
3121  */
3122  if (!partrel->rd_rel->relispartition ||
3123  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3124  false))
3125  {
3126  /*
3127  * Yes, so simply UPDATE the partition. We don't call
3128  * apply_handle_update_internal() here, which would
3129  * normally do the following work, to avoid repeating some
3130  * work already done above to find the local tuple in the
3131  * partition.
3132  */
3133  ExecOpenIndices(partrelinfo, true);
3134  InitConflictIndexes(partrelinfo);
3135 
3136  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3138  ACL_UPDATE);
3139  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3140  localslot, remoteslot_part);
3141  }
3142  else
3143  {
3144  /* Move the tuple into the new partition. */
3145 
3146  /*
3147  * New partition will be found using tuple routing, which
3148  * can only occur via the parent table. We might need to
3149  * convert the tuple to the parent's rowtype. Note that
3150  * this is the tuple found in the partition, not the
3151  * original search tuple received by this function.
3152  */
3153  if (map)
3154  {
3155  TupleConversionMap *PartitionToRootMap =
3157  RelationGetDescr(parentrel));
3158 
3159  remoteslot =
3160  execute_attr_map_slot(PartitionToRootMap->attrMap,
3161  remoteslot_part, remoteslot);
3162  }
3163  else
3164  {
3165  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3166  slot_getallattrs(remoteslot);
3167  }
3168 
3169  /* Find the new partition. */
3171  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3172  proute, remoteslot,
3173  estate);
3174  MemoryContextSwitchTo(oldctx);
3175  Assert(partrelinfo_new != partrelinfo);
3176  partrel_new = partrelinfo_new->ri_RelationDesc;
3177 
3178  /* Check that new partition also has supported relkind. */
3179  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3181  RelationGetRelationName(partrel_new));
3182 
3183  ExecOpenIndices(partrelinfo, false);
3184 
3185  /* DELETE old tuple found in the old partition. */
3186  EvalPlanQualSetSlot(&epqstate, localslot);
3188  ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3189 
3190  /* INSERT new tuple into the new partition. */
3191 
3192  /*
3193  * Convert the replacement tuple to match the destination
3194  * partition rowtype.
3195  */
3197  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3198  if (remoteslot_part == NULL)
3199  remoteslot_part = table_slot_create(partrel_new,
3200  &estate->es_tupleTable);
3201  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3202  if (map != NULL)
3203  {
3204  remoteslot_part = execute_attr_map_slot(map->attrMap,
3205  remoteslot,
3206  remoteslot_part);
3207  }
3208  else
3209  {
3210  remoteslot_part = ExecCopySlot(remoteslot_part,
3211  remoteslot);
3212  slot_getallattrs(remoteslot);
3213  }
3214  MemoryContextSwitchTo(oldctx);
3215  apply_handle_insert_internal(edata, partrelinfo_new,
3216  remoteslot_part);
3217  }
3218 
3219  ExecCloseIndices(partrelinfo);
3220  EvalPlanQualEnd(&epqstate);
3221  }
3222  break;
3223 
3224  default:
3225  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3226  break;
3227  }
3228 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:896
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:30
@ CT_UPDATE_MISSING
Definition: conflict.h:36
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1787
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1232
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
@ CMD_UPDATE
Definition: nodes.h:266
#define makeNode(_type_)
Definition: nodes.h:155
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:602
PartitionTupleRouting * proute
Definition: worker.c:216
ModifyTableState * mtstate
Definition: worker.c:215
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:674
CmdType operation
Definition: execnodes.h:1359
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1363
PlanState ps
Definition: execnodes.h:1358
Plan * plan
Definition: execnodes.h:1120
EState * state
Definition: execnodes.h:1122
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:588
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368

References ACL_DELETE, ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationDelete(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), InvalidOid, InvalidRepOriginId, InvalidTransactionId, LogicalRepRelMapEntry::localindexoid, LOG, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), slot_store_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2345 of file worker.c.

2346 {
2347  LogicalRepTyp typ;
2348 
2350  return;
2351 
2352  logicalrep_read_typ(s, &typ);
2353 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2542 of file worker.c.

2543 {
2544  LogicalRepRelMapEntry *rel;
2545  LogicalRepRelId relid;
2546  UserContext ucxt;
2547  ApplyExecutionData *edata;
2548  EState *estate;
2549  LogicalRepTupleData oldtup;
2550  LogicalRepTupleData newtup;
2551  bool has_oldtup;
2552  TupleTableSlot *remoteslot;
2553  RTEPermissionInfo *target_perminfo;
2554  MemoryContext oldctx;
2555  bool run_as_owner;
2556 
2557  /*
2558  * Quick return if we are skipping data modification changes or handling
2559  * streamed transactions.
2560  */
2561  if (is_skipping_changes() ||
2563  return;
2564 
2566 
2567  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2568  &newtup);
2569  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2570  if (!should_apply_changes_for_rel(rel))
2571  {
2572  /*
2573  * The relation can't become interesting in the middle of the
2574  * transaction so it's safe to unlock it.
2575  */
2578  return;
2579  }
2580 
2581  /* Set relation for error callback */
2583 
2584  /* Check if we can do the update. */
2586 
2587  /*
2588  * Make sure that any user-supplied code runs as the table owner, unless
2589  * the user has opted out of that behavior.
2590  */
2591  run_as_owner = MySubscription->runasowner;
2592  if (!run_as_owner)
2593  SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2594 
2595  /* Initialize the executor state. */
2596  edata = create_edata_for_relation(rel);
2597  estate = edata->estate;
2598  remoteslot = ExecInitExtraTupleSlot(estate,
2599  RelationGetDescr(rel->localrel),
2600  &TTSOpsVirtual);
2601 
2602  /*
2603  * Populate updatedCols so that per-column triggers can fire, and so
2604  * executor can correctly pass down indexUnchanged hint. This could
2605  * include more columns than were actually changed on the publisher
2606  * because the logical replication protocol doesn't contain that
2607  * information. But it would for example exclude columns that only exist
2608  * on the subscriber, since we are not touching those.
2609  */
2610  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2611  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2612  {
2614  int remoteattnum = rel->attrmap->attnums[i];
2615 
2616  if (!att->attisdropped && remoteattnum >= 0)
2617  {
2618  Assert(remoteattnum < newtup.ncols);
2619  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2620  target_perminfo->updatedCols =
2621  bms_add_member(target_perminfo->updatedCols,
2623  }
2624  }
2625 
2626  /* Build the search tuple. */
2628  slot_store_data(remoteslot, rel,
2629  has_oldtup ? &oldtup : &newtup);
2630  MemoryContextSwitchTo(oldctx);
2631 
2632  /* For a partitioned table, apply update to correct partition. */
2633  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2635  remoteslot, &newtup, CMD_UPDATE);
2636  else
2638  remoteslot, &newtup, rel->localindexoid);
2639 
2640  finish_edata(edata);
2641 
2642  /* Reset relation for error callback */
2644 
2645  if (!run_as_owner)
2646  RestoreUserContext(&ucxt);
2647 
2649 
2651 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2659
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
int i
Definition: isn.c:73
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:637
Bitmapset * updatedCols
Definition: parsenodes.h:1295
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2659 of file worker.c.

2664 {
2665  EState *estate = edata->estate;
2666  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2667  Relation localrel = relinfo->ri_RelationDesc;
2668  EPQState epqstate;
2669  TupleTableSlot *localslot;
2670  bool found;
2671  MemoryContext oldctx;
2672 
2673  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2674  ExecOpenIndices(relinfo, true);
2675 
2676  found = FindReplTupleInLocalRel(edata, localrel,
2677  &relmapentry->remoterel,
2678  localindexoid,
2679  remoteslot, &localslot);
2680 
2681  /*
2682  * Tuple found.
2683  *
2684  * Note this will fail if there are other conflicting unique indexes.
2685  */
2686  if (found)
2687  {
2688  RepOriginId localorigin;
2689  TransactionId localxmin;
2690  TimestampTz localts;
2691 
2692  /*
2693  * Report the conflict if the tuple was modified by a different
2694  * origin.
2695  */
2696  if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2697  localorigin != replorigin_session_origin)
2698  {
2699  TupleTableSlot *newslot;
2700 
2701  /* Store the new tuple for conflict reporting */
2702  newslot = table_slot_create(localrel, &estate->es_tupleTable);
2703  slot_store_data(newslot, relmapentry, newtup);
2704 
2706  remoteslot, localslot, newslot,
2707  InvalidOid, localxmin, localorigin, localts);
2708  }
2709 
2710  /* Process and store remote tuple in the slot */
2712  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2713  MemoryContextSwitchTo(oldctx);
2714 
2715  EvalPlanQualSetSlot(&epqstate, remoteslot);
2716 
2717  InitConflictIndexes(relinfo);
2718 
2719  /* Do the actual update. */
2721  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2722  remoteslot);
2723  }
2724  else
2725  {
2726  TupleTableSlot *newslot = localslot;
2727 
2728  /* Store the new tuple for conflict reporting */
2729  slot_store_data(newslot, relmapentry, newtup);
2730 
2731  /*
2732  * The tuple to be updated could not be found. Do nothing except for
2733  * emitting a log message.
2734  */
2735  ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2736  remoteslot, NULL, newslot,
2738  InvalidRepOriginId, 0);
2739  }
2740 
2741  /* Cleanup. */
2742  ExecCloseIndices(relinfo);
2743  EvalPlanQualEnd(&epqstate);
2744 }

References ACL_UPDATE, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), InvalidOid, InvalidRepOriginId, InvalidTransactionId, LOG, MemoryContextSwitchTo(), NIL, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_RelationDesc, slot_modify_data(), slot_store_data(), table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2022 of file worker.c.

2024 {
2025  int nchanges;
2026  char path[MAXPGPATH];
2027  char *buffer = NULL;
2028  MemoryContext oldcxt;
2029  ResourceOwner oldowner;
2030  int fileno;
2031  off_t offset;
2032 
2033  if (!am_parallel_apply_worker())
2035 
2036  /* Make sure we have an open transaction */
2038 
2039  /*
2040  * Allocate file handle and memory required to process all the messages in
2041  * TopTransactionContext to avoid them getting reset after each message is
2042  * processed.
2043  */
2045 
2046  /* Open the spool file for the committed/prepared transaction */
2048  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2049 
2050  /*
2051  * Make sure the file is owned by the toplevel transaction so that the
2052  * file will not be accidentally closed when aborting a subtransaction.
2053  */
2054  oldowner = CurrentResourceOwner;
2056 
2057  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2058 
2059  CurrentResourceOwner = oldowner;
2060 
2061  buffer = palloc(BLCKSZ);
2062 
2063  MemoryContextSwitchTo(oldcxt);
2064 
2065  remote_final_lsn = lsn;
2066 
2067  /*
2068  * Make sure the handle apply_dispatch methods are aware we're in a remote
2069  * transaction.
2070  */
2071  in_remote_transaction = true;
2073 
2075 
2076  /*
2077  * Read the entries one by one and pass them through the same logic as in
2078  * apply_dispatch.
2079  */
2080  nchanges = 0;
2081  while (true)
2082  {
2084  size_t nbytes;
2085  int len;
2086 
2088 
2089  /* read length of the on-disk record */
2090  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2091 
2092  /* have we reached end of the file? */
2093  if (nbytes == 0)
2094  break;
2095 
2096  /* do we have a correct length? */
2097  if (len <= 0)
2098  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2099  len, path);
2100 
2101  /* make sure we have sufficiently large buffer */
2102  buffer = repalloc(buffer, len);
2103 
2104  /* and finally read the data into the buffer */
2105  BufFileReadExact(stream_fd, buffer, len);
2106 
2107  BufFileTell(stream_fd, &fileno, &offset);
2108 
2109  /* init a stringinfo using the buffer and call apply_dispatch */
2110  initReadOnlyStringInfo(&s2, buffer, len);
2111 
2112  /* Ensure we are reading the data into our memory context. */
2114 
2115  apply_dispatch(&s2);
2116 
2118 
2119  MemoryContextSwitchTo(oldcxt);
2120 
2121  nchanges++;
2122 
2123  /*
2124  * It is possible the file has been closed because we have processed
2125  * the transaction end message like stream_commit in which case that
2126  * must be the last message.
2127  */
2128  if (!stream_fd)
2129  {
2130  ensure_last_message(stream_fileset, xid, fileno, offset);
2131  break;
2132  }
2133 
2134  if (nchanges % 1000 == 0)
2135  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2136  nchanges, path);
2137  }
2138 
2139  if (stream_fd)
2141 
2142  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2143  nchanges, path);
2144 
2145  return;
2146 }
MemoryContext ApplyMessageContext
Definition: worker.c:291
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4283
void apply_dispatch(StringInfo s)
Definition: worker.c:3364
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1990
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopTransactionContext
Definition: mcxt.c:154
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:130
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 3923 of file worker.c.

3924 {
3926  {
3927  /*
3928  * Don't stop the parallel apply worker as the leader will detect the
3929  * subscription parameter change and restart logical replication later
3930  * anyway. This also prevents the leader from reporting errors when
3931  * trying to communicate with a stopped parallel apply worker, which
3932  * would accidentally disable subscriptions if disable_on_error was
3933  * set.
3934  */
3935  return;
3936  }
3937 
3938  /*
3939  * Reset the last-start time for this apply worker so that the launcher
3940  * will restart it without waiting for wal_retrieve_retry_interval if the
3941  * subscription is still active, and so that we won't leak that hash table
3942  * entry if it isn't.
3943  */
3944  if (am_leader_apply_worker())
3946 
3947  proc_exit(0);
3948 }
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), and maybe_reread_subscription().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4784 of file worker.c.

4785 {
4786  int worker_slot = DatumGetInt32(main_arg);
4787 
4788  InitializingApplyWorker = true;
4789 
4790  SetupApplyOrSyncWorker(worker_slot);
4791 
4792  InitializingApplyWorker = false;
4793 
4794  run_apply_worker();
4795 
4796  proc_exit(0);
4797 }
bool InitializingApplyWorker
Definition: worker.c:319
static void run_apply_worker()
Definition: worker.c:4531
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4730
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 5113 of file worker.c.

5114 {
5115  if (isCommit && on_commit_wakeup_workers_subids != NIL)
5116  {
5117  ListCell *lc;
5118 
5119  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5120  foreach(lc, on_commit_wakeup_workers_subids)
5121  {
5122  Oid subid = lfirst_oid(lc);
5123  List *workers;
5124  ListCell *lc2;
5125 
5126  workers = logicalrep_workers_find(subid, true, false);
5127  foreach(lc2, workers)
5128  {
5129  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5130 
5132  }
5133  }
5134  LWLockRelease(LogicalRepWorkerLock);
5135  }
5136 
5137  /* The List storage will be reclaimed automatically in xact cleanup. */
5139 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:302
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:275
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4283 of file worker.c.

4284 {
4285  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4286 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2501 of file worker.c.

2502 {
2503  /*
2504  * For partitioned tables, we only need to care if the target partition is
2505  * updatable (aka has PK or RI defined for it).
2506  */
2507  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2508  return;
2509 
2510  /* Updatable, no error. */
2511  if (rel->updatable)
2512  return;
2513 
2514  /*
2515  * We are in error mode so it's fine this is somewhat slow. It's better to
2516  * give user correct error.
2517  */
2519  {
2520  ereport(ERROR,
2521  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2522  errmsg("publisher did not send replica identity column "
2523  "expected by the logical replication target relation \"%s.%s\"",
2524  rel->remoterel.nspname, rel->remoterel.relname)));
2525  }
2526 
2527  ereport(ERROR,
2528  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2529  errmsg("logical replication target relation \"%s.%s\" has "
2530  "neither REPLICA IDENTITY index nor PRIMARY "
2531  "KEY and published relation does not have "
2532  "REPLICA IDENTITY FULL",
2533  rel->remoterel.nspname, rel->remoterel.relname)));
2534 }
#define OidIsValid(objectId)
Definition: c.h:775
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:851

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 4480 of file worker.c.

4481 {
4482  if (subxact_data.subxacts)
4484 
4485  subxact_data.subxacts = NULL;
4487  subxact_data.nsubxacts = 0;
4489 }
static ApplySubXactData subxact_data
Definition: worker.c:358
void pfree(void *pointer)
Definition: mcxt.c:1521
uint32 nsubxacts
Definition: worker.c:352
uint32 nsubxacts_max
Definition: worker.c:353
SubXactInfo * subxacts
Definition: worker.c:355
TransactionId subxact_last
Definition: worker.c:354

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 4911 of file worker.c.

4912 {
4913  Relation rel;
4914  Form_pg_subscription subform;
4915  HeapTuple tup;
4916  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4917  bool started_tx = false;
4918 
4920  return;
4921 
4922  if (!IsTransactionState())
4923  {
4925  started_tx = true;
4926  }
4927 
4928  /*
4929  * Protect subskiplsn of pg_subscription from being concurrently updated
4930  * while clearing it.
4931  */
4932  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4933  AccessShareLock);
4934 
4935  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4936 
4937  /* Fetch the existing tuple. */
4938  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4940 
4941  if (!HeapTupleIsValid(tup))
4942  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4943 
4944  subform = (Form_pg_subscription) GETSTRUCT(tup);
4945 
4946  /*
4947  * Clear the subskiplsn. If the user has already changed subskiplsn before
4948  * clearing it we don't update the catalog and the replication origin
4949  * state won't get advanced. So in the worst case, if the server crashes
4950  * before sending an acknowledgment of the flush position the transaction
4951  * will be sent again and the user needs to set subskiplsn again. We can
4952  * reduce the possibility by logging a replication origin WAL record to
4953  * advance the origin LSN instead but there is no way to advance the
4954  * origin timestamp and it doesn't seem to be worth doing anything about
4955  * it since it's a very rare case.
4956  */
4957  if (subform->subskiplsn == myskiplsn)
4958  {
4959  bool nulls[Natts_pg_subscription];
4960  bool replaces[Natts_pg_subscription];
4961  Datum values[Natts_pg_subscription];
4962 
4963  memset(values, 0, sizeof(values));
4964  memset(nulls, false, sizeof(nulls));
4965  memset(replaces, false, sizeof(replaces));
4966 
4967  /* reset subskiplsn */
4968  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4969  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4970 
4971  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4972  replaces);
4973  CatalogTupleUpdate(rel, &tup->t_self, tup);
4974 
4975  if (myskiplsn != finish_lsn)
4976  ereport(WARNING,
4977  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4978  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4979  LSN_FORMAT_ARGS(finish_lsn),
4980  LSN_FORMAT_ARGS(myskiplsn)));
4981  }
4982 
4983  heap_freetuple(tup);
4984  table_close(rel, NoLock);
4985 
4986  if (started_tx)
4988 }
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define likely(x)
Definition: c.h:310
int errdetail(const char *fmt,...)
Definition: elog.c:1203
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1073
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 650 of file worker.c.

651 {
652  ApplyExecutionData *edata;
653  EState *estate;
654  RangeTblEntry *rte;
655  List *perminfos = NIL;
656  ResultRelInfo *resultRelInfo;
657 
658  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
659  edata->targetRel = rel;
660 
661  edata->estate = estate = CreateExecutorState();
662 
663  rte = makeNode(RangeTblEntry);
664  rte->rtekind = RTE_RELATION;
665  rte->relid = RelationGetRelid(rel->localrel);
666  rte->relkind = rel->localrel->rd_rel->relkind;
667  rte->rellockmode = AccessShareLock;
668 
669  addRTEPermissionInfo(&perminfos, rte);
670 
671  ExecInitRangeTable(estate, list_make1(rte), perminfos);
672 
673  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
674 
675  /*
676  * Use Relation opened by logicalrep_rel_open() instead of opening it
677  * again.
678  */
679  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
680 
681  /*
682  * We put the ResultRelInfo in the es_opened_result_relations list, even
683  * though we don't populate the es_result_relations array. That's a bit
684  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
685  *
686  * ExecOpenIndices() is not called here either, each execution path doing
687  * an apply operation being responsible for that.
688  */
690  lappend(estate->es_opened_result_relations, resultRelInfo);
691 
692  estate->es_output_cid = GetCurrentCommandId(true);
693 
694  /* Prepare to catch AFTER triggers. */
696 
697  /* other fields of edata remain NULL for now */
698 
699  return edata;
700 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1194
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:728
EState * CreateExecutorState(void)
Definition: execUtils.c:88
void * palloc0(Size size)
Definition: mcxt.c:1347
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1017
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:505
List * es_opened_result_relations
Definition: execnodes.h:650
CommandId es_output_cid
Definition: execnodes.h:644
RTEKind rtekind
Definition: parsenodes.h:1047
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5021
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 4804 of file worker.c.

4805 {
4806  /*
4807  * Emit the error message, and recover from the error state to an idle
4808  * state
4809  */
4810  HOLD_INTERRUPTS();
4811 
4812  EmitErrorReport();
4814  FlushErrorState();
4815 
4817 
4818  /* Report the worker failed during either table synchronization or apply */
4820  !am_tablesync_worker());
4821 
4822  /* Disable the subscription */
4826 
4827  /* Ensure we remove no-longer-useful entry for worker's start time */
4828  if (am_leader_apply_worker())
4830 
4831  /* Notify the subscription has been disabled and exit */
4832  ereport(LOG,
4833  errmsg("subscription \"%s\" has been disabled because of an error",
4834  MySubscription->name));
4835 
4836  proc_exit(0);
4837 }
void EmitErrorReport(void)
Definition: elog.c:1687
void FlushErrorState(void)
Definition: elog.c:1867
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4855

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 1990 of file worker.c.

1992 {
1993  char path[MAXPGPATH];
1994  BufFile *fd;
1995  int last_fileno;
1996  off_t last_offset;
1997 
1999 
2001 
2003 
2004  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2005 
2006  BufFileSeek(fd, 0, 0, SEEK_END);
2007  BufFileTell(fd, &last_fileno, &last_offset);
2008 
2009  BufFileClose(fd);
2010 
2012 
2013  if (last_fileno != fileno || last_offset != offset)
2014  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2015  path);
2016 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2906 of file worker.c.

2911 {
2912  EState *estate = edata->estate;
2913  bool found;
2914 
2915  /*
2916  * Regardless of the top-level operation, we're performing a read here, so
2917  * check for SELECT privileges.
2918  */
2919  TargetPrivilegesCheck(localrel, ACL_SELECT);
2920 
2921  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2922 
2923  Assert(OidIsValid(localidxoid) ||
2924  (remoterel->replident == REPLICA_IDENTITY_FULL));
2925 
2926  if (OidIsValid(localidxoid))
2927  {
2928 #ifdef USE_ASSERT_CHECKING
2929  Relation idxrel = index_open(localidxoid, AccessShareLock);
2930 
2931  /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2932  Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2933  (remoterel->replident == REPLICA_IDENTITY_FULL &&
2935  edata->targetRel->attrmap)));
2936  index_close(idxrel, AccessShareLock);
2937 #endif
2938 
2939  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2941  remoteslot, *localslot);
2942  }
2943  else
2944  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2945  remoteslot, *localslot);
2946 
2947  return found;
2948 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2409
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo, AttrMap *attrmap)
Definition: relation.c:804

References AccessShareLock, ACL_SELECT, Assert, LogicalRepRelMapEntry::attrmap, BuildIndexInfo(), EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 707 of file worker.c.

708 {
709  EState *estate = edata->estate;
710 
711  /* Handle any queued AFTER triggers. */
712  AfterTriggerEndQuery(estate);
713 
714  /* Shut down tuple routing, if any was done. */
715  if (edata->proute)
716  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
717 
718  /*
719  * Cleanup. It might seem that we should call ExecCloseResultRelations()
720  * here, but we intentionally don't. It would close the rel we added to
721  * es_opened_result_relations above, which is wrong because we took no
722  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
723  * any other relations opened during execution.
724  */
725  ExecResetTupleTable(estate->es_tupleTable, false);
726  FreeExecutorState(estate);
727  pfree(edata);
728 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1278
void FreeExecutorState(EState *estate)
Definition: execUtils.c:189
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5041

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3484 of file worker.c.

3486 {
3487  dlist_mutable_iter iter;
3488  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3489 
3491  *flush = InvalidXLogRecPtr;
3492 
3494  {
3495  FlushPosition *pos =
3496  dlist_container(FlushPosition, node, iter.cur);
3497 
3498  *write = pos->remote_end;
3499 
3500  if (pos->local_end <= local_flush)
3501  {
3502  *flush = pos->remote_end;
3503  dlist_delete(iter.cur);
3504  pfree(pos);
3505  }
3506  else
3507  {
3508  /*
3509  * Don't want to uselessly iterate over the rest of the list which
3510  * could potentially be long. Instead get the last element and
3511  * grab the write position from there.
3512  */
3513  pos = dlist_tail_element(FlushPosition, node,
3514  &lsn_mapping);
3515  *write = pos->remote_end;
3516  *have_pending_txes = true;
3517  return;
3518  }
3519  }
3520 
3521  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3522 }
static dlist_head lsn_mapping
Definition: worker.c:205
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:202
XLogRecPtr local_end
Definition: worker.c:201
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6498

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 5160 of file worker.c.

5161 {
5162  *winfo = NULL;
5163 
5165  {
5166  return TRANS_PARALLEL_APPLY;
5167  }
5168 
5169  /*
5170  * If we are processing this transaction using a parallel apply worker
5171  * then either we send the changes to the parallel worker or if the worker
5172  * is busy then serialize the changes to the file which will later be
5173  * processed by the parallel worker.
5174  */
5175  *winfo = pa_find_worker(xid);
5176 
5177  if (*winfo && (*winfo)->serialize_changes)
5178  {
5180  }
5181  else if (*winfo)
5182  {
5184  }
5185 
5186  /*
5187  * If there is no parallel worker involved to process this transaction
5188  * then we either directly apply the change or serialize it to a file
5189  * which will later be applied when the transaction finish message is
5190  * processed.
5191  */
5192  else if (in_streamed_transaction)
5193  {
5194  return TRANS_LEADER_SERIALIZE;
5195  }
5196  else
5197  {
5198  return TRANS_LEADER_APPLY;
5199  }
5200 }
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 557 of file worker.c.

558 {
559  TransactionId current_xid;
561  TransApplyAction apply_action;
562  StringInfoData original_msg;
563 
564  apply_action = get_transaction_apply_action(stream_xid, &winfo);
565 
566  /* not in streaming mode */
567  if (apply_action == TRANS_LEADER_APPLY)
568  return false;
569 
571 
572  /*
573  * The parallel apply worker needs the xid in this message to decide
574  * whether to define a savepoint, so save the original message that has
575  * not moved the cursor after the xid. We will serialize this message to a
576  * file in PARTIAL_SERIALIZE mode.
577  */
578  original_msg = *s;
579 
580  /*
581  * We should have received XID of the subxact as the first part of the
582  * message, so extract it.
583  */
584  current_xid = pq_getmsgint(s, 4);
585 
586  if (!TransactionIdIsValid(current_xid))
587  ereport(ERROR,
588  (errcode(ERRCODE_PROTOCOL_VIOLATION),
589  errmsg_internal("invalid transaction ID in streamed replication transaction")));
590 
591  switch (apply_action)
592  {
594  Assert(stream_fd);
595 
596  /* Add the new subxact to the array (unless already there). */
597  subxact_info_add(current_xid);
598 
599  /* Write the change to the current file */
601  return true;
602 
604  Assert(winfo);
605 
606  /*
607  * XXX The publisher side doesn't always send relation/type update
608  * messages after the streaming transaction, so also update the
609  * relation/type in leader apply worker. See function
610  * cleanup_rel_sync_cache.
611  */
612  if (pa_send_data(winfo, s->len, s->data))
613  return (action != LOGICAL_REP_MSG_RELATION &&
615 
616  /*
617  * Switch to serialize mode when we are not able to send the
618  * change to parallel apply worker.
619  */
620  pa_switch_to_partial_serialize(winfo, false);
621 
622  /* fall through */
624  stream_write_change(action, &original_msg);
625 
626  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
627  return (action != LOGICAL_REP_MSG_RELATION &&
629 
632 
633  /* Define a savepoint for a subxact if needed. */
634  pa_start_subtrans(current_xid, stream_xid);
635  return false;
636 
637  default:
638  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
639  return false; /* silence compiler warning */
640  }
641 }
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4198
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415

References generate_unaccent_rules::action, Assert, StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 4636 of file worker.c.

4637 {
4638  MemoryContext oldctx;
4639 
4640  /* Run as replica session replication role. */
4641  SetConfigOption("session_replication_role", "replica",
4643 
4644  /* Connect to our database. */
4647  0);
4648 
4649  /*
4650  * Set always-secure search path, so malicious users can't redirect user
4651  * code (e.g. pg_index.indexprs).
4652  */
4653  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4654 
4655  /* Load the subscription into persistent memory context. */
4657  "ApplyContext",
4661 
4663  if (!MySubscription)
4664  {
4665  ereport(LOG,
4666  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4668 
4669  /* Ensure we remove no-longer-useful entry for worker's start time */
4670  if (am_leader_apply_worker())
4672 
4673  proc_exit(0);
4674  }
4675 
4676  MySubscriptionValid = true;
4677  MemoryContextSwitchTo(oldctx);
4678 
4679  if (!MySubscription->enabled)
4680  {
4681  ereport(LOG,
4682  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4683  MySubscription->name)));
4684 
4686  }
4687 
4688  /* Setup synchronous commit according to the user's wishes */
4689  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4691 
4692  /*
4693  * Keep us informed about subscription or role changes. Note that the
4694  * role's superuser privilege can be revoked.
4695  */
4696  CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4698  (Datum) 0);
4699 
4702  (Datum) 0);
4703 
4704  if (am_tablesync_worker())
4705  ereport(LOG,
4706  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4709  else
4710  ereport(LOG,
4711  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4712  MySubscription->name)));
4713 
4715 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4083
static void apply_worker_exit(void)
Definition: worker.c:3923
MemoryContext ApplyContext
Definition: worker.c:292
static bool MySubscriptionValid
Definition: worker.c:300
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:886
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4291
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
MemoryContext TopMemoryContext
Definition: mcxt.c:149
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4852 of file worker.c.

4853 {
4855 }
bool IsLogicalWorker(void)
Definition: worker.c:4843

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4843 of file worker.c.

4844 {
4845  return MyLogicalRepWorker != NULL;
4846 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3570 of file worker.c.

3571 {
3572  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3573  bool ping_sent = false;
3574  TimeLineID tli;
3575  ErrorContextCallback errcallback;
3576 
3577  /*
3578  * Init the ApplyMessageContext which we clean up after each replication
3579  * protocol message.
3580  */
3582  "ApplyMessageContext",
3584 
3585  /*
3586  * This memory context is used for per-stream data when the streaming mode
3587  * is enabled. This context is reset on each stream stop.
3588  */
3590  "LogicalStreamingContext",
3592 
3593  /* mark as idle, before starting to loop */
3595 
3596  /*
3597  * Push apply error context callback. Fields will be filled while applying
3598  * a change.
3599  */
3600  errcallback.callback = apply_error_callback;
3601  errcallback.previous = error_context_stack;
3602  error_context_stack = &errcallback;
3604 
3605  /* This outer loop iterates once per wait. */
3606  for (;;)
3607  {
3609  int rc;
3610  int len;
3611  char *buf = NULL;
3612  bool endofstream = false;
3613  long wait_time;
3614 
3616 
3618 
3620 
3621  if (len != 0)
3622  {
3623  /* Loop to process all available data (without blocking). */
3624  for (;;)
3625  {
3627 
3628  if (len == 0)
3629  {
3630  break;
3631  }
3632  else if (len < 0)
3633  {
3634  ereport(LOG,
3635  (errmsg("data stream from publisher has ended")));
3636  endofstream = true;
3637  break;
3638  }
3639  else
3640  {
3641  int c;
3642  StringInfoData s;
3643 
3644  if (ConfigReloadPending)
3645  {
3646  ConfigReloadPending = false;
3648  }
3649 
3650  /* Reset timeout. */
3651  last_recv_timestamp = GetCurrentTimestamp();
3652  ping_sent = false;
3653 
3654  /* Ensure we are reading the data into our memory context. */
3656 
3658 
3659  c = pq_getmsgbyte(&s);
3660 
3661  if (c == 'w')
3662  {
3663  XLogRecPtr start_lsn;
3664  XLogRecPtr end_lsn;
3665  TimestampTz send_time;
3666 
3667  start_lsn = pq_getmsgint64(&s);
3668  end_lsn = pq_getmsgint64(&s);
3669  send_time = pq_getmsgint64(&s);
3670 
3671  if (last_received < start_lsn)
3672  last_received = start_lsn;
3673 
3674  if (last_received < end_lsn)
3675  last_received = end_lsn;
3676 
3677  UpdateWorkerStats(last_received, send_time, false);
3678 
3679  apply_dispatch(&s);
3680  }
3681  else if (c == 'k')
3682  {
3683  XLogRecPtr end_lsn;
3685  bool reply_requested;
3686 
3687  end_lsn = pq_getmsgint64(&s);
3688  timestamp = pq_getmsgint64(&s);
3689  reply_requested = pq_getmsgbyte(&s);
3690 
3691  if (last_received < end_lsn)
3692  last_received = end_lsn;
3693 
3694  send_feedback(last_received, reply_requested, false);
3695  UpdateWorkerStats(last_received, timestamp, true);
3696  }
3697  /* other message types are purposefully ignored */
3698 
3700  }
3701 
3703  }
3704  }
3705 
3706  /* confirm all writes so far */
3707  send_feedback(last_received, false, false);
3708 
3710  {
3711  /*
3712  * If we didn't get any transactions for a while there might be
3713  * unconsumed invalidation messages in the queue, consume them
3714  * now.
3715  */
3718 
3719  /* Process any table synchronization changes. */
3720  process_syncing_tables(last_received);
3721  }
3722 
3723  /* Cleanup the memory. */
3726 
3727  /* Check if we need to exit the streaming loop. */
3728  if (endofstream)
3729  break;
3730 
3731  /*
3732  * Wait for more data or latch. If we have unflushed transactions,
3733  * wake up after WalWriterDelay to see if they've been flushed yet (in
3734  * which case we should send a feedback message). Otherwise, there's
3735  * no particular urgency about waking up unless we get data or a
3736  * signal.
3737  */
3738  if (!dlist_is_empty(&lsn_mapping))
3739  wait_time = WalWriterDelay;
3740  else
3741  wait_time = NAPTIME_PER_CYCLE;
3742 
3746  fd, wait_time,
3747  WAIT_EVENT_LOGICAL_APPLY_MAIN);
3748 
3749  if (rc & WL_LATCH_SET)
3750  {
3753  }
3754 
3755  if (ConfigReloadPending)
3756  {
3757  ConfigReloadPending = false;
3759  }
3760 
3761  if (rc & WL_TIMEOUT)
3762  {
3763  /*
3764  * We didn't receive anything new. If we haven't heard anything
3765  * from the server for more than wal_receiver_timeout / 2, ping
3766  * the server. Also, if it's been longer than
3767  * wal_receiver_status_interval since the last update we sent,
3768  * send a status update to the primary anyway, to report any
3769  * progress in applying WAL.
3770  */
3771  bool requestReply = false;
3772 
3773  /*
3774  * Check if time since last receive from primary has reached the
3775  * configured limit.
3776  */
3777  if (wal_receiver_timeout > 0)
3778  {
3780  TimestampTz timeout;
3781 
3782  timeout =
3783  TimestampTzPlusMilliseconds(last_recv_timestamp,
3785 
3786  if (now >= timeout)
3787  ereport(ERROR,
3788  (errcode(ERRCODE_CONNECTION_FAILURE),
3789  errmsg("terminating logical replication worker due to timeout")));
3790 
3791  /* Check to see if it's time for a ping. */
3792  if (!ping_sent)
3793  {
3794  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3795  (wal_receiver_timeout / 2));
3796  if (now >= timeout)
3797  {
3798  requestReply = true;
3799  ping_sent = true;
3800  }
3801  }
3802  }
3803 
3804  send_feedback(last_received, requestReply, requestReply);
3805 
3806  /*
3807  * Force reporting to ensure long idle periods don't lead to
3808  * arbitrarily delayed stats. Stats can only be reported outside
3809  * of (implicit or explicit) transactions. That shouldn't lead to
3810  * stats being delayed for long, because transactions are either
3811  * sent as a whole on commit or streamed. Streamed transactions
3812  * are spilled to disk and applied on commit.
3813  */
3814  if (!IsTransactionState())
3815  pgstat_report_stat(true);
3816  }
3817  }
3818 
3819  /* Pop the error context stack */
3820  error_context_stack = errcallback.previous;
3822 
3823  /* All done */
3825 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3554
#define NAPTIME_PER_CYCLE
Definition: worker.c:196
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:289
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3834
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
void apply_error_callback(void *arg)
Definition: worker.c:4992
static MemoryContext LogicalStreamingContext
Definition: worker.c:295
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
ErrorContextCallback * error_context_stack
Definition: elog.c:94
struct Latch * MyLatch
Definition: globals.c:62
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
static char * buf
Definition: pg_test_fsync.c:73
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
char * c
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_receiver_timeout
Definition: walreceiver.c:88
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:452
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:454
int WalWriterDelay
Definition: walwriter.c:71
uint32 TimeLineID
Definition: xlogdefs.h:59

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3954 of file worker.c.

3955 {
3956  MemoryContext oldctx;
3958  bool started_tx = false;
3959 
3960  /* When cache state is valid there is nothing to do here. */
3961  if (MySubscriptionValid)
3962  return;
3963 
3964  /* This function might be called inside or outside of transaction. */
3965  if (!IsTransactionState())
3966  {
3968  started_tx = true;
3969  }
3970 
3971  /* Ensure allocations in permanent context. */
3973 
3975 
3976  /*
3977  * Exit if the subscription was removed. This normally should not happen
3978  * as the worker gets killed during DROP SUBSCRIPTION.
3979  */
3980  if (!newsub)
3981  {
3982  ereport(LOG,
3983  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3984  MySubscription->name)));
3985 
3986  /* Ensure we remove no-longer-useful entry for worker's start time */
3987  if (am_leader_apply_worker())
3989 
3990  proc_exit(0);
3991  }
3992 
3993  /* Exit if the subscription was disabled. */
3994  if (!newsub->enabled)
3995  {
3996  ereport(LOG,
3997  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3998  MySubscription->name)));
3999 
4001  }
4002 
4003  /* !slotname should never happen when enabled is true. */
4004  Assert(newsub->slotname);
4005 
4006  /* two-phase cannot be altered while the worker is running */
4007  Assert(newsub->twophasestate == MySubscription->twophasestate);
4008 
4009  /*
4010  * Exit if any parameter that affects the remote connection was changed.
4011  * The launcher will start a new worker but note that the parallel apply
4012  * worker won't restart if the streaming option's value is changed from
4013  * 'parallel' to any other value or the server decides not to stream the
4014  * in-progress transaction.
4015  */
4016  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4017  strcmp(newsub->name, MySubscription->name) != 0 ||
4018  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4019  newsub->binary != MySubscription->binary ||
4020  newsub->stream != MySubscription->stream ||
4021  newsub->passwordrequired != MySubscription->passwordrequired ||
4022  strcmp(newsub->origin, MySubscription->origin) != 0 ||
4023  newsub->owner != MySubscription->owner ||
4024  !equal(newsub->publications, MySubscription->publications))
4025  {
4027  ereport(LOG,
4028  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4029  MySubscription->name)));
4030  else
4031  ereport(LOG,
4032  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4033  MySubscription->name)));
4034 
4036  }
4037 
4038  /*
4039  * Exit if the subscription owner's superuser privileges have been
4040  * revoked.
4041  */
4042  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4043  {
4045  ereport(LOG,
4046  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4047  MySubscription->name));
4048  else
4049  ereport(LOG,
4050  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4051  MySubscription->name));
4052 
4054  }
4055 
4056  /* Check for other changes that should never happen too. */
4057  if (newsub->dbid != MySubscription->dbid)
4058  {
4059  elog(ERROR, "subscription %u changed unexpectedly",
4061  }
4062 
4063  /* Clean old subscription info and switch to new one. */
4066 
4067  MemoryContextSwitchTo(oldctx);
4068 
4069  /* Change synchronous commit according to the user's wishes */
4070  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4072 
4073  if (started_tx)
4075 
4076  MySubscriptionValid = true;
4077 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser,