PostgreSQL Source Code git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  RetainDeadTuplesData
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define MIN_XID_ADVANCE_INTERVAL   100
 
#define MAX_XID_ADVANCE_INTERVAL   180000
 
#define is_skipping_changes()   (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct RetainDeadTuplesData RetainDeadTuplesData
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 
enum  RetainDeadTuplesPhase {
  RDT_GET_CANDIDATE_XID , RDT_REQUEST_PUBLISHER_STATUS , RDT_WAIT_FOR_PUBLISHER_STATUS , RDT_WAIT_FOR_LOCAL_FLUSH ,
  RDT_STOP_CONFLICT_INFO_RETENTION , RDT_RESUME_CONFLICT_INFO_RETENTION
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void maybe_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data, bool status_received)
 
static bool can_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data)
 
static void process_rdt_phase_transition (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void get_candidate_xid (RetainDeadTuplesData *rdt_data)
 
static void request_publisher_status (RetainDeadTuplesData *rdt_data)
 
static void wait_for_publisher_status (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void wait_for_local_flush (RetainDeadTuplesData *rdt_data)
 
static bool should_stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void resume_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static bool update_retention_status (bool active)
 
static void reset_retention_data_fields (RetainDeadTuplesData *rdt_data)
 
static void adjust_xid_advance_interval (RetainDeadTuplesData *rdt_data, bool new_xid_found)
 
static void apply_worker_exit (void)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static bool FindDeletedTupleInLocalRel (Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
static void replorigin_reset (int code, Datum arg)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static bool IsIndexUsableForFindingDeletedTuple (Oid localindexoid, TransactionId conflict_detection_xmin)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker (void)
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static XLogRecPtr last_flushpos = InvalidXLogRecPtr
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))

Definition at line 517 of file worker.c.

◆ MAX_XID_ADVANCE_INTERVAL

#define MAX_XID_ADVANCE_INTERVAL   180000

Definition at line 456 of file worker.c.

◆ MIN_XID_ADVANCE_INTERVAL

#define MIN_XID_ADVANCE_INTERVAL   100

Definition at line 455 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 299 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ RetainDeadTuplesData

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ RetainDeadTuplesPhase

Enumerator
RDT_GET_CANDIDATE_XID 
RDT_REQUEST_PUBLISHER_STATUS 
RDT_WAIT_FOR_PUBLISHER_STATUS 
RDT_WAIT_FOR_LOCAL_FLUSH 
RDT_STOP_CONFLICT_INFO_RETENTION 
RDT_RESUME_CONFLICT_INFO_RETENTION 

Definition at line 387 of file worker.c.

388{
RetainDeadTuplesPhase
Definition: worker.c:388
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition: worker.c:391
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition: worker.c:394
@ RDT_GET_CANDIDATE_XID
Definition: worker.c:389
@ RDT_REQUEST_PUBLISHER_STATUS
Definition: worker.c:390
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition: worker.c:392
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition: worker.c:393

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 369 of file worker.c.

370{
371 /* The action for non-streaming transactions. */
373
374 /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:370
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:375
@ TRANS_PARALLEL_APPLY
Definition: worker.c:378
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:376
@ TRANS_LEADER_APPLY
Definition: worker.c:372
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:377

Function Documentation

◆ adjust_xid_advance_interval()

static void adjust_xid_advance_interval ( RetainDeadTuplesData rdt_data,
bool  new_xid_found 
)
static

Definition at line 4955 of file worker.c.

4956{
4957 if (rdt_data->xid_advance_interval && !new_xid_found)
4958 {
4959 int max_interval = wal_receiver_status_interval
4962
4963 /*
4964 * No new transaction ID has been assigned since the last check, so
4965 * double the interval, but not beyond the maximum allowable value.
4966 */
4967 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4968 max_interval);
4969 }
4970 else if (rdt_data->xid_advance_interval &&
4972 {
4973 /*
4974 * Retention has been stopped, so double the interval-capped at a
4975 * maximum of 3 minutes. The wal_receiver_status_interval is
4976 * intentionally not used as a upper bound, since the likelihood of
4977 * retention resuming is lower than that of general activity resuming.
4978 */
4979 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4981 }
4982 else
4983 {
4984 /*
4985 * A new transaction ID was found or the interval is not yet
4986 * initialized, so set the interval to the minimum value.
4987 */
4989 }
4990
4991 /*
4992 * Ensure the wait time remains within the maximum retention time limit
4993 * when retention is active.
4994 */
4996 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4998}
#define MAX_XID_ADVANCE_INTERVAL
Definition: worker.c:456
#define MIN_XID_ADVANCE_INTERVAL
Definition: worker.c:455
Subscription * MySubscription
Definition: worker.c:479
#define Min(x, y)
Definition: c.h:1006
int xid_advance_interval
Definition: worker.c:445
int wal_receiver_status_interval
Definition: walreceiver.c:88

References MAX_XID_ADVANCE_INTERVAL, Subscription::maxretention, Min, MIN_XID_ADVANCE_INTERVAL, MySubscription, Subscription::retentionactive, wal_receiver_status_interval, and RetainDeadTuplesData::xid_advance_interval.

Referenced by get_candidate_xid().

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3775 of file worker.c.

3776{
3778 LogicalRepMsgType saved_command;
3779
3780 /*
3781 * Set the current command being applied. Since this function can be
3782 * called recursively when applying spooled changes, save the current
3783 * command.
3784 */
3785 saved_command = apply_error_callback_arg.command;
3787
3788 switch (action)
3789 {
3792 break;
3793
3796 break;
3797
3800 break;
3801
3804 break;
3805
3808 break;
3809
3812 break;
3813
3816 break;
3817
3820 break;
3821
3824 break;
3825
3827
3828 /*
3829 * Logical replication does not use generic logical messages yet.
3830 * Although, it could be used by other applications that use this
3831 * output plugin.
3832 */
3833 break;
3834
3837 break;
3838
3841 break;
3842
3845 break;
3846
3849 break;
3850
3853 break;
3854
3857 break;
3858
3861 break;
3862
3865 break;
3866
3869 break;
3870
3871 default:
3872 ereport(ERROR,
3873 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3874 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3875 }
3876
3877 /* Reset the current command */
3878 apply_error_callback_arg.command = saved_command;
3879}
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1518
static void apply_handle_type(StringInfo s)
Definition: worker.c:2586
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3647
static void apply_handle_update(StringInfo s)
Definition: worker.c:2790
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2390
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1405
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:459
static void apply_handle_delete(StringInfo s)
Definition: worker.c:3012
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1211
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1236
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:2071
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2563
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1331
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1457
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1885
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1666
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1265
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1725
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2633
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:325

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 6159 of file worker.c.

6160{
6162
6164 return;
6165
6166 Assert(errarg->origin_name);
6167
6168 if (errarg->rel == NULL)
6169 {
6170 if (!TransactionIdIsValid(errarg->remote_xid))
6171 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6172 errarg->origin_name,
6174 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6175 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6176 errarg->origin_name,
6178 errarg->remote_xid);
6179 else
6180 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6181 errarg->origin_name,
6183 errarg->remote_xid,
6184 LSN_FORMAT_ARGS(errarg->finish_lsn));
6185 }
6186 else
6187 {
6188 if (errarg->remote_attnum < 0)
6189 {
6190 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6191 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6192 errarg->origin_name,
6194 errarg->rel->remoterel.nspname,
6195 errarg->rel->remoterel.relname,
6196 errarg->remote_xid);
6197 else
6198 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6199 errarg->origin_name,
6201 errarg->rel->remoterel.nspname,
6202 errarg->rel->remoterel.relname,
6203 errarg->remote_xid,
6204 LSN_FORMAT_ARGS(errarg->finish_lsn));
6205 }
6206 else
6207 {
6208 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6209 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6210 errarg->origin_name,
6212 errarg->rel->remoterel.nspname,
6213 errarg->rel->remoterel.relname,
6214 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6215 errarg->remote_xid);
6216 else
6217 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6218 errarg->origin_name,
6220 errarg->rel->remoterel.nspname,
6221 errarg->rel->remoterel.relname,
6222 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6223 errarg->remote_xid,
6224 LSN_FORMAT_ARGS(errarg->finish_lsn));
6225 }
6226 }
6227}
#define errcontext
Definition: elog.h:198
Assert(PointerIsAligned(start, uint64))
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1212
TransactionId remote_xid
Definition: worker.c:330
XLogRecPtr finish_lsn
Definition: worker.c:331
LogicalRepRelMapEntry * rel
Definition: worker.c:326
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsValid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 1211 of file worker.c.

1212{
1213 LogicalRepBeginData begin_data;
1214
1215 /* There must not be an active streaming transaction. */
1217
1218 logicalrep_read_begin(s, &begin_data);
1219 set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1220
1221 remote_final_lsn = begin_data.final_lsn;
1222
1224
1225 in_remote_transaction = true;
1226
1228}
bool in_remote_transaction
Definition: worker.c:484
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:6231
static XLogRecPtr remote_final_lsn
Definition: worker.c:485
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:6021
static TransactionId stream_xid
Definition: worker.c:490
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:63
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert(), LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1265 of file worker.c.

1266{
1267 LogicalRepPreparedTxnData begin_data;
1268
1269 /* Tablesync should never receive prepare. */
1270 if (am_tablesync_worker())
1271 ereport(ERROR,
1272 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1273 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1274
1275 /* There must not be an active streaming transaction. */
1277
1278 logicalrep_read_begin_prepare(s, &begin_data);
1279 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1280
1281 remote_final_lsn = begin_data.prepare_lsn;
1282
1284
1285 in_remote_transaction = true;
1286
1288}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:134
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1236 of file worker.c.

1237{
1238 LogicalRepCommitData commit_data;
1239
1240 logicalrep_read_commit(s, &commit_data);
1241
1242 if (commit_data.commit_lsn != remote_final_lsn)
1243 ereport(ERROR,
1244 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1245 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1246 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1248
1249 apply_handle_commit_internal(&commit_data);
1250
1251 /*
1252 * Process any tables that are being synchronized in parallel, as well as
1253 * any newly added tables or sequences.
1254 */
1255 ProcessSyncingRelations(commit_data.end_lsn);
1256
1259}
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2503
static void reset_apply_error_context_info(void)
Definition: worker.c:6239
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:98
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition: syncutils.c:155

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), ProcessSyncingRelations(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2503 of file worker.c.

2504{
2505 if (is_skipping_changes())
2506 {
2508
2509 /*
2510 * Start a new transaction to clear the subskiplsn, if not started
2511 * yet.
2512 */
2513 if (!IsTransactionState())
2515 }
2516
2517 if (IsTransactionState())
2518 {
2519 /*
2520 * The transaction is either non-empty or skipped, so we clear the
2521 * subskiplsn.
2522 */
2524
2525 /*
2526 * Update origin state so we can restart streaming from correct
2527 * position in case of crash.
2528 */
2531
2533
2534 if (IsTransactionBlock())
2535 {
2536 EndTransactionBlock(false);
2538 }
2539
2540 pgstat_report_stat(false);
2541
2543 }
2544 else
2545 {
2546 /* Process any invalidation messages that might have accumulated. */
2549 }
2550
2551 in_remote_transaction = false;
2552}
static void stop_skipping_changes(void)
Definition: worker.c:6048
#define is_skipping_changes()
Definition: worker.c:517
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:6070
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3939
void maybe_reread_subscription(void)
Definition: worker.c:5038
void AcceptInvalidationMessages(void)
Definition: inval.c:930
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:388
void StartTransactionCommand(void)
Definition: xact.c:3077
bool IsTransactionBlock(void)
Definition: xact.c:4989
void CommitTransactionCommand(void)
Definition: xact.c:3175
bool EndTransactionBlock(bool chain)
Definition: xact.c:4062
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:257

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1405 of file worker.c.

1406{
1408 char gid[GIDSIZE];
1409
1410 logicalrep_read_commit_prepared(s, &prepare_data);
1411 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1412
1413 /* Compute GID for two_phase transactions. */
1415 gid, sizeof(gid));
1416
1417 /* There is no transaction when COMMIT PREPARED is called */
1419
1420 /*
1421 * Update origin state so we can restart streaming from correct position
1422 * in case of crash.
1423 */
1426
1427 FinishPreparedTransaction(gid, true);
1430 pgstat_report_stat(false);
1431
1433 in_remote_transaction = false;
1434
1435 /*
1436 * Process any tables that are being synchronized in parallel, as well as
1437 * any newly added tables or sequences.
1438 */
1439 ProcessSyncingRelations(prepare_data.end_lsn);
1440
1442
1445}
static void begin_replication_step(void)
Definition: worker.c:726
static void end_replication_step(void)
Definition: worker.c:749
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:267
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2747
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1497
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), ProcessSyncingRelations(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 3012 of file worker.c.

3013{
3015 LogicalRepTupleData oldtup;
3016 LogicalRepRelId relid;
3017 UserContext ucxt;
3018 ApplyExecutionData *edata;
3019 EState *estate;
3020 TupleTableSlot *remoteslot;
3021 MemoryContext oldctx;
3022 bool run_as_owner;
3023
3024 /*
3025 * Quick return if we are skipping data modification changes or handling
3026 * streamed transactions.
3027 */
3028 if (is_skipping_changes() ||
3030 return;
3031
3033
3034 relid = logicalrep_read_delete(s, &oldtup);
3037 {
3038 /*
3039 * The relation can't become interesting in the middle of the
3040 * transaction so it's safe to unlock it.
3041 */
3044 return;
3045 }
3046
3047 /* Set relation for error callback */
3049
3050 /* Check if we can do the delete. */
3052
3053 /*
3054 * Make sure that any user-supplied code runs as the table owner, unless
3055 * the user has opted out of that behavior.
3056 */
3057 run_as_owner = MySubscription->runasowner;
3058 if (!run_as_owner)
3059 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3060
3061 /* Initialize the executor state. */
3062 edata = create_edata_for_relation(rel);
3063 estate = edata->estate;
3064 remoteslot = ExecInitExtraTupleSlot(estate,
3066 &TTSOpsVirtual);
3067
3068 /* Build the search tuple. */
3070 slot_store_data(remoteslot, rel, &oldtup);
3071 MemoryContextSwitchTo(oldctx);
3072
3073 /* For a partitioned table, apply delete to correct partition. */
3074 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3076 remoteslot, NULL, CMD_DELETE);
3077 else
3078 {
3079 ResultRelInfo *relinfo = edata->targetRelInfo;
3080
3081 ExecOpenIndices(relinfo, false);
3082 apply_handle_delete_internal(edata, relinfo,
3083 remoteslot, rel->localindexoid);
3084 ExecCloseIndices(relinfo);
3085 }
3086
3087 finish_edata(edata);
3088
3089 /* Reset relation for error callback */
3091
3092 if (!run_as_owner)
3093 RestoreUserContext(&ucxt);
3094
3096
3098}
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2749
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:870
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:681
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:777
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:3351
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:1017
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:928
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:3106
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:239
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:161
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2020
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:661
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:278
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:561
#define RelationGetDescr(relation)
Definition: rel.h:541
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:517
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:361
ResultRelInfo * targetRelInfo
Definition: worker.c:315
EState * estate
Definition: worker.c:312
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 3106 of file worker.c.

3110{
3111 EState *estate = edata->estate;
3112 Relation localrel = relinfo->ri_RelationDesc;
3113 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3114 EPQState epqstate;
3115 TupleTableSlot *localslot;
3116 ConflictTupleInfo conflicttuple = {0};
3117 bool found;
3118
3119 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3120
3121 /* Caller should have opened indexes already. */
3122 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3123 !localrel->rd_rel->relhasindex ||
3124 RelationGetIndexList(localrel) == NIL);
3125
3126 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3127 remoteslot, &localslot);
3128
3129 /* If found delete it. */
3130 if (found)
3131 {
3132 /*
3133 * Report the conflict if the tuple was modified by a different
3134 * origin.
3135 */
3136 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3137 &conflicttuple.origin, &conflicttuple.ts) &&
3138 conflicttuple.origin != replorigin_session_origin)
3139 {
3140 conflicttuple.slot = localslot;
3142 remoteslot, NULL,
3143 list_make1(&conflicttuple));
3144 }
3145
3146 EvalPlanQualSetSlot(&epqstate, localslot);
3147
3148 /* Do the actual delete. */
3150 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3151 }
3152 else
3153 {
3154 /*
3155 * The tuple to be deleted could not be found. Do nothing except for
3156 * emitting a log message.
3157 */
3158 ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3159 remoteslot, NULL, list_make1(&conflicttuple));
3160 }
3161
3162 /* Cleanup. */
3163 EvalPlanQualEnd(&epqstate);
3164}
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:3174
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2601
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:104
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:63
@ CT_DELETE_MISSING
Definition: conflict.h:52
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:49
#define LOG
Definition: elog.h:31
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2718
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3182
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:289
RepOriginId replorigin_session_origin
Definition: origin.c:163
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4836
LogicalRepRelMapEntry * targetRel
Definition: worker.c:314
TimestampTz ts
Definition: conflict.h:78
RepOriginId origin
Definition: conflict.h:77
TransactionId xmin
Definition: conflict.h:75
TupleTableSlot * slot
Definition: conflict.h:71
Relation ri_RelationDesc
Definition: execnodes.h:480
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:486

References ACL_DELETE, Assert(), CT_DELETE_MISSING, CT_DELETE_ORIGIN_DIFFERS, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), GetTupleTransactionInfo(), list_make1, LOG, NIL, ConflictTupleInfo::origin, RelationData::rd_rel, RelationGetIndexList(), LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_IndexRelationDescs, ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ConflictTupleInfo::ts, and ConflictTupleInfo::xmin.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2633 of file worker.c.

2634{
2636 LogicalRepTupleData newtup;
2637 LogicalRepRelId relid;
2638 UserContext ucxt;
2639 ApplyExecutionData *edata;
2640 EState *estate;
2641 TupleTableSlot *remoteslot;
2642 MemoryContext oldctx;
2643 bool run_as_owner;
2644
2645 /*
2646 * Quick return if we are skipping data modification changes or handling
2647 * streamed transactions.
2648 */
2649 if (is_skipping_changes() ||
2651 return;
2652
2654
2655 relid = logicalrep_read_insert(s, &newtup);
2658 {
2659 /*
2660 * The relation can't become interesting in the middle of the
2661 * transaction so it's safe to unlock it.
2662 */
2665 return;
2666 }
2667
2668 /*
2669 * Make sure that any user-supplied code runs as the table owner, unless
2670 * the user has opted out of that behavior.
2671 */
2672 run_as_owner = MySubscription->runasowner;
2673 if (!run_as_owner)
2674 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2675
2676 /* Set relation for error callback */
2678
2679 /* Initialize the executor state. */
2680 edata = create_edata_for_relation(rel);
2681 estate = edata->estate;
2682 remoteslot = ExecInitExtraTupleSlot(estate,
2684 &TTSOpsVirtual);
2685
2686 /* Process and store remote tuple in the slot */
2688 slot_store_data(remoteslot, rel, &newtup);
2689 slot_fill_defaults(rel, estate, remoteslot);
2690 MemoryContextSwitchTo(oldctx);
2691
2692 /* For a partitioned table, insert the tuple into a partition. */
2693 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2695 remoteslot, NULL, CMD_INSERT);
2696 else
2697 {
2698 ResultRelInfo *relinfo = edata->targetRelInfo;
2699
2700 ExecOpenIndices(relinfo, false);
2701 apply_handle_insert_internal(edata, relinfo, remoteslot);
2702 ExecCloseIndices(relinfo);
2703 }
2704
2705 finish_edata(edata);
2706
2707 /* Reset relation for error callback */
2709
2710 if (!run_as_owner)
2711 RestoreUserContext(&ucxt);
2712
2714
2716}
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2724
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:959
@ CMD_INSERT
Definition: nodes.h:277
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:428

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2724 of file worker.c.

2727{
2728 EState *estate = edata->estate;
2729
2730 /* Caller should have opened indexes already. */
2731 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2732 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2734
2735 /* Caller will not have done this bit. */
2737 InitConflictIndexes(relinfo);
2738
2739 /* Do the insert. */
2741 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2742}
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:139
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:580

References ACL_INSERT, Assert(), ApplyExecutionData::estate, ExecSimpleRelationInsert(), InitConflictIndexes(), NIL, RelationData::rd_rel, RelationGetIndexList(), ResultRelInfo::ri_IndexRelationDescs, ResultRelInfo::ri_onConflictArbiterIndexes, ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1666 of file worker.c.

1667{
1668 /*
1669 * ORIGIN message can only come inside streaming transaction or inside
1670 * remote transaction and before any actual writes.
1671 */
1675 ereport(ERROR,
1676 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1677 errmsg_internal("ORIGIN message sent out of order")));
1678}
static bool in_streamed_transaction
Definition: worker.c:488

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1331 of file worker.c.

1332{
1333 LogicalRepPreparedTxnData prepare_data;
1334
1335 logicalrep_read_prepare(s, &prepare_data);
1336
1337 if (prepare_data.prepare_lsn != remote_final_lsn)
1338 ereport(ERROR,
1339 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1340 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1341 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1343
1344 /*
1345 * Unlike commit, here, we always prepare the transaction even though no
1346 * change has happened in this transaction or all changes are skipped. It
1347 * is done this way because at commit prepared time, we won't know whether
1348 * we have skipped preparing a transaction because of those reasons.
1349 *
1350 * XXX, We can optimize such that at commit prepared time, we first check
1351 * whether we have prepared the transaction or not but that doesn't seem
1352 * worthwhile because such cases shouldn't be common.
1353 */
1355
1356 apply_handle_prepare_internal(&prepare_data);
1357
1360 pgstat_report_stat(false);
1361
1362 /*
1363 * It is okay not to set the local_end LSN for the prepare because we
1364 * always flush the prepare record. So, we can send the acknowledgment of
1365 * the remote_end LSN as soon as prepare is finished.
1366 *
1367 * XXX For the sake of consistency with commit, we could have set it with
1368 * the LSN of prepare but as of now we don't track that value similar to
1369 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1370 * it.
1371 */
1373
1374 in_remote_transaction = false;
1375
1376 /*
1377 * Process any tables that are being synchronized in parallel, as well as
1378 * any newly added tables or sequences.
1379 */
1380 ProcessSyncingRelations(prepare_data.end_lsn);
1381
1382 /*
1383 * Since we have already prepared the transaction, in a case where the
1384 * server crashes before clearing the subskiplsn, it will be left but the
1385 * transaction won't be resent. But that's okay because it's a rare case
1386 * and the subskiplsn will be cleared when finishing the next transaction.
1387 */
1390
1393}
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1294
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:228
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, ProcessSyncingRelations(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1294 of file worker.c.

1295{
1296 char gid[GIDSIZE];
1297
1298 /*
1299 * Compute unique GID for two_phase transactions. We don't use GID of
1300 * prepared transaction sent by server as that can lead to deadlock when
1301 * we have multiple subscriptions from same node point to publications on
1302 * the same node. See comments atop worker.c
1303 */
1305 gid, sizeof(gid));
1306
1307 /*
1308 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1309 * called within the PrepareTransactionBlock below.
1310 */
1311 if (!IsTransactionBlock())
1312 {
1314 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1315 }
1316
1317 /*
1318 * Update origin state so we can restart streaming from correct position
1319 * in case of crash.
1320 */
1321 replorigin_session_origin_lsn = prepare_data->end_lsn;
1323
1325}
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:4010
void BeginTransactionBlock(void)
Definition: xact.c:3942

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2563 of file worker.c.

2564{
2565 LogicalRepRelation *rel;
2566
2568 return;
2569
2570 rel = logicalrep_read_rel(s);
2572
2573 /* Also reset all entries in the partition map that refer to remoterel. */
2575}
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:698
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:584
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1457 of file worker.c.

1458{
1460 char gid[GIDSIZE];
1461
1462 logicalrep_read_rollback_prepared(s, &rollback_data);
1463 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1464
1465 /* Compute GID for two_phase transactions. */
1467 gid, sizeof(gid));
1468
1469 /*
1470 * It is possible that we haven't received prepare because it occurred
1471 * before walsender reached a consistent point or the two_phase was still
1472 * not enabled by that time, so in such cases, we need to skip rollback
1473 * prepared.
1474 */
1475 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1476 rollback_data.prepare_time))
1477 {
1478 /*
1479 * Update origin state so we can restart streaming from correct
1480 * position in case of crash.
1481 */
1484
1485 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1487 FinishPreparedTransaction(gid, false);
1490
1492 }
1493
1494 pgstat_report_stat(false);
1495
1496 /*
1497 * It is okay not to set the local_end LSN for the rollback of prepared
1498 * transaction because we always flush the WAL record for it. See
1499 * apply_handle_prepare.
1500 */
1502 in_remote_transaction = false;
1503
1504 /*
1505 * Process any tables that are being synchronized in parallel, as well as
1506 * any newly added tables or sequences.
1507 */
1509
1512}
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:325
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2688

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, ProcessSyncingRelations(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 2071 of file worker.c.

2072{
2073 TransactionId xid;
2074 TransactionId subxid;
2075 LogicalRepStreamAbortData abort_data;
2077 TransApplyAction apply_action;
2078
2079 /* Save the message before it is consumed. */
2080 StringInfoData original_msg = *s;
2081 bool toplevel_xact;
2082
2084 ereport(ERROR,
2085 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2086 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2087
2088 /* We receive abort information only when we can apply in parallel. */
2089 logicalrep_read_stream_abort(s, &abort_data,
2091
2092 xid = abort_data.xid;
2093 subxid = abort_data.subxid;
2094 toplevel_xact = (xid == subxid);
2095
2096 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2097
2098 apply_action = get_transaction_apply_action(xid, &winfo);
2099
2100 switch (apply_action)
2101 {
2102 case TRANS_LEADER_APPLY:
2103
2104 /*
2105 * We are in the leader apply worker and the transaction has been
2106 * serialized to file.
2107 */
2108 stream_abort_internal(xid, subxid);
2109
2110 elog(DEBUG1, "finished processing the STREAM ABORT command");
2111 break;
2112
2114 Assert(winfo);
2115
2116 /*
2117 * For the case of aborting the subtransaction, we increment the
2118 * number of streaming blocks and take the lock again before
2119 * sending the STREAM_ABORT to ensure that the parallel apply
2120 * worker will wait on the lock for the next set of changes after
2121 * processing the STREAM_ABORT message if it is not already
2122 * waiting for STREAM_STOP message.
2123 *
2124 * It is important to perform this locking before sending the
2125 * STREAM_ABORT message so that the leader can hold the lock first
2126 * and the parallel apply worker will wait for the leader to
2127 * release the lock. This is the same as what we do in
2128 * apply_handle_stream_stop. See Locking Considerations atop
2129 * applyparallelworker.c.
2130 */
2131 if (!toplevel_xact)
2132 {
2136 }
2137
2138 if (pa_send_data(winfo, s->len, s->data))
2139 {
2140 /*
2141 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2142 * wait here for the parallel apply worker to finish as that
2143 * is not required to maintain the commit order and won't have
2144 * the risk of failures due to transaction dependencies and
2145 * deadlocks. However, it is possible that before the parallel
2146 * worker finishes and we clear the worker info, the xid
2147 * wraparound happens on the upstream and a new transaction
2148 * with the same xid can appear and that can lead to duplicate
2149 * entries in ParallelApplyTxnHash. Yet another problem could
2150 * be that we may have serialized the changes in partial
2151 * serialize mode and the file containing xact changes may
2152 * already exist, and after xid wraparound trying to create
2153 * the file for the same xid can lead to an error. To avoid
2154 * these problems, we decide to wait for the aborts to finish.
2155 *
2156 * Note, it is okay to not update the flush location position
2157 * for aborts as in worst case that means such a transaction
2158 * won't be sent again after restart.
2159 */
2160 if (toplevel_xact)
2162
2163 break;
2164 }
2165
2166 /*
2167 * Switch to serialize mode when we are not able to send the
2168 * change to parallel apply worker.
2169 */
2170 pa_switch_to_partial_serialize(winfo, true);
2171
2172 /* fall through */
2174 Assert(winfo);
2175
2176 /*
2177 * Parallel apply worker might have applied some changes, so write
2178 * the STREAM_ABORT message so that it can rollback the
2179 * subtransaction if needed.
2180 */
2182 &original_msg);
2183
2184 if (toplevel_xact)
2185 {
2188 }
2189 break;
2190
2192
2193 /*
2194 * If the parallel apply worker is applying spooled messages then
2195 * close the file before aborting.
2196 */
2197 if (toplevel_xact && stream_fd)
2199
2200 pa_stream_abort(&abort_data);
2201
2202 /*
2203 * We need to wait after processing rollback to savepoint for the
2204 * next set of changes.
2205 *
2206 * We have a race condition here due to which we can start waiting
2207 * here when there are more chunk of streams in the queue. See
2208 * apply_handle_stream_stop.
2209 */
2210 if (!toplevel_xact)
2212
2213 elog(DEBUG1, "finished processing the STREAM ABORT command");
2214 break;
2215
2216 default:
2217 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2218 break;
2219 }
2220
2222}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:422
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:6316
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:5498
static BufFile * stream_fd
Definition: worker.c:520
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1988
static void stream_close_file(void)
Definition: worker.c:5450
uint32 TransactionId
Definition: c.h:660
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1187
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2390 of file worker.c.

2391{
2392 TransactionId xid;
2393 LogicalRepCommitData commit_data;
2395 TransApplyAction apply_action;
2396
2397 /* Save the message before it is consumed. */
2398 StringInfoData original_msg = *s;
2399
2401 ereport(ERROR,
2402 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2403 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2404
2405 xid = logicalrep_read_stream_commit(s, &commit_data);
2406 set_apply_error_context_xact(xid, commit_data.commit_lsn);
2407
2408 apply_action = get_transaction_apply_action(xid, &winfo);
2409
2410 switch (apply_action)
2411 {
2412 case TRANS_LEADER_APPLY:
2413
2414 /*
2415 * The transaction has been serialized to file, so replay all the
2416 * spooled operations.
2417 */
2419 commit_data.commit_lsn);
2420
2421 apply_handle_commit_internal(&commit_data);
2422
2423 /* Unlink the files with serialized changes and subxact info. */
2425
2426 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2427 break;
2428
2430 Assert(winfo);
2431
2432 if (pa_send_data(winfo, s->len, s->data))
2433 {
2434 /* Finish processing the streaming transaction. */
2435 pa_xact_finish(winfo, commit_data.end_lsn);
2436 break;
2437 }
2438
2439 /*
2440 * Switch to serialize mode when we are not able to send the
2441 * change to parallel apply worker.
2442 */
2443 pa_switch_to_partial_serialize(winfo, true);
2444
2445 /* fall through */
2447 Assert(winfo);
2448
2450 &original_msg);
2451
2453
2454 /* Finish processing the streaming transaction. */
2455 pa_xact_finish(winfo, commit_data.end_lsn);
2456 break;
2457
2459
2460 /*
2461 * If the parallel apply worker is applying spooled messages then
2462 * close the file before committing.
2463 */
2464 if (stream_fd)
2466
2467 apply_handle_commit_internal(&commit_data);
2468
2470
2471 /*
2472 * It is important to set the transaction state as finished before
2473 * releasing the lock. See pa_wait_for_xact_finish.
2474 */
2477
2479
2480 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2481 break;
2482
2483 default:
2484 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2485 break;
2486 }
2487
2488 /*
2489 * Process any tables that are being synchronized in parallel, as well as
2490 * any newly added tables or sequences.
2491 */
2492 ProcessSyncingRelations(commit_data.end_lsn);
2493
2495
2497}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:5381
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2260
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1132
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert(), LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), ProcessSyncingRelations(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1518 of file worker.c.

1519{
1520 LogicalRepPreparedTxnData prepare_data;
1522 TransApplyAction apply_action;
1523
1524 /* Save the message before it is consumed. */
1525 StringInfoData original_msg = *s;
1526
1528 ereport(ERROR,
1529 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1530 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1531
1532 /* Tablesync should never receive prepare. */
1533 if (am_tablesync_worker())
1534 ereport(ERROR,
1535 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1536 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1537
1538 logicalrep_read_stream_prepare(s, &prepare_data);
1539 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1540
1541 apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1542
1543 switch (apply_action)
1544 {
1545 case TRANS_LEADER_APPLY:
1546
1547 /*
1548 * The transaction has been serialized to file, so replay all the
1549 * spooled operations.
1550 */
1552 prepare_data.xid, prepare_data.prepare_lsn);
1553
1554 /* Mark the transaction as prepared. */
1555 apply_handle_prepare_internal(&prepare_data);
1556
1558
1559 /*
1560 * It is okay not to set the local_end LSN for the prepare because
1561 * we always flush the prepare record. See apply_handle_prepare.
1562 */
1564
1565 in_remote_transaction = false;
1566
1567 /* Unlink the files with serialized changes and subxact info. */
1569
1570 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1571 break;
1572
1574 Assert(winfo);
1575
1576 if (pa_send_data(winfo, s->len, s->data))
1577 {
1578 /* Finish processing the streaming transaction. */
1579 pa_xact_finish(winfo, prepare_data.end_lsn);
1580 break;
1581 }
1582
1583 /*
1584 * Switch to serialize mode when we are not able to send the
1585 * change to parallel apply worker.
1586 */
1587 pa_switch_to_partial_serialize(winfo, true);
1588
1589 /* fall through */
1591 Assert(winfo);
1592
1593 stream_open_and_write_change(prepare_data.xid,
1595 &original_msg);
1596
1598
1599 /* Finish processing the streaming transaction. */
1600 pa_xact_finish(winfo, prepare_data.end_lsn);
1601 break;
1602
1604
1605 /*
1606 * If the parallel apply worker is applying spooled messages then
1607 * close the file before preparing.
1608 */
1609 if (stream_fd)
1611
1613
1614 /* Mark the transaction as prepared. */
1615 apply_handle_prepare_internal(&prepare_data);
1616
1618
1620
1621 /*
1622 * It is okay not to set the local_end LSN for the prepare because
1623 * we always flush the prepare record. See apply_handle_prepare.
1624 */
1626
1629
1631
1632 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1633 break;
1634
1635 default:
1636 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1637 break;
1638 }
1639
1640 pgstat_report_stat(false);
1641
1642 /*
1643 * Process any tables that are being synchronized in parallel, as well as
1644 * any newly added tables or sequences.
1645 */
1646 ProcessSyncingRelations(prepare_data.end_lsn);
1647
1648 /*
1649 * Similar to prepare case, the subskiplsn could be left in a case of
1650 * server crash but it's okay. See the comments in apply_handle_prepare().
1651 */
1654
1656
1658}
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:365

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, ProcessSyncingRelations(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1725 of file worker.c.

1726{
1727 bool first_segment;
1729 TransApplyAction apply_action;
1730
1731 /* Save the message before it is consumed. */
1732 StringInfoData original_msg = *s;
1733
1735 ereport(ERROR,
1736 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1737 errmsg_internal("duplicate STREAM START message")));
1738
1739 /* There must not be an active streaming transaction. */
1741
1742 /* notify handle methods we're processing a remote transaction */
1744
1745 /* extract XID of the top-level transaction */
1746 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1747
1749 ereport(ERROR,
1750 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1751 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1752
1754
1755 /* Try to allocate a worker for the streaming transaction. */
1756 if (first_segment)
1758
1759 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1760
1761 switch (apply_action)
1762 {
1764
1765 /*
1766 * Function stream_start_internal starts a transaction. This
1767 * transaction will be committed on the stream stop unless it is a
1768 * tablesync worker in which case it will be committed after
1769 * processing all the messages. We need this transaction for
1770 * handling the BufFile, used for serializing the streaming data
1771 * and subxact info.
1772 */
1773 stream_start_internal(stream_xid, first_segment);
1774 break;
1775
1777 Assert(winfo);
1778
1779 /*
1780 * Once we start serializing the changes, the parallel apply
1781 * worker will wait for the leader to release the stream lock
1782 * until the end of the transaction. So, we don't need to release
1783 * the lock or increment the stream count in that case.
1784 */
1785 if (pa_send_data(winfo, s->len, s->data))
1786 {
1787 /*
1788 * Unlock the shared object lock so that the parallel apply
1789 * worker can continue to receive changes.
1790 */
1791 if (!first_segment)
1793
1794 /*
1795 * Increment the number of streaming blocks waiting to be
1796 * processed by parallel apply worker.
1797 */
1799
1800 /* Cache the parallel apply worker for this transaction. */
1802 break;
1803 }
1804
1805 /*
1806 * Switch to serialize mode when we are not able to send the
1807 * change to parallel apply worker.
1808 */
1809 pa_switch_to_partial_serialize(winfo, !first_segment);
1810
1811 /* fall through */
1813 Assert(winfo);
1814
1815 /*
1816 * Open the spool file unless it was already opened when switching
1817 * to serialize mode. The transaction started in
1818 * stream_start_internal will be committed on the stream stop.
1819 */
1820 if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1821 stream_start_internal(stream_xid, first_segment);
1822
1824
1825 /* Cache the parallel apply worker for this transaction. */
1827 break;
1828
1830 if (first_segment)
1831 {
1832 /* Hold the lock until the end of the transaction. */
1835
1836 /*
1837 * Signal the leader apply worker, as it may be waiting for
1838 * us.
1839 */
1842 }
1843
1845 break;
1846
1847 default:
1848 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1849 break;
1850 }
1851
1853}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:496
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:5468
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1687
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:723
#define InvalidOid
Definition: postgres_ext.h:37
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1082
@ PARALLEL_TRANS_STARTED
@ WORKERTYPE_APPLY

References AccessExclusiveLock, Assert(), StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, WORKERTYPE_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1885 of file worker.c.

1886{
1888 TransApplyAction apply_action;
1889
1891 ereport(ERROR,
1892 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1893 errmsg_internal("STREAM STOP message without STREAM START")));
1894
1895 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1896
1897 switch (apply_action)
1898 {
1901 break;
1902
1904 Assert(winfo);
1905
1906 /*
1907 * Lock before sending the STREAM_STOP message so that the leader
1908 * can hold the lock first and the parallel apply worker will wait
1909 * for leader to release the lock. See Locking Considerations atop
1910 * applyparallelworker.c.
1911 */
1913
1914 if (pa_send_data(winfo, s->len, s->data))
1915 {
1917 break;
1918 }
1919
1920 /*
1921 * Switch to serialize mode when we are not able to send the
1922 * change to parallel apply worker.
1923 */
1924 pa_switch_to_partial_serialize(winfo, true);
1925
1926 /* fall through */
1931 break;
1932
1934 elog(DEBUG1, "applied %u changes in the streaming chunk",
1936
1937 /*
1938 * By the time parallel apply worker is processing the changes in
1939 * the current streaming block, the leader apply worker may have
1940 * sent multiple streaming blocks. This can lead to parallel apply
1941 * worker start waiting even when there are more chunk of streams
1942 * in the queue. So, try to lock only if there is no message left
1943 * in the queue. See Locking Considerations atop
1944 * applyparallelworker.c.
1945 *
1946 * Note that here we have a race condition where we can start
1947 * waiting even when there are pending streaming chunks. This can
1948 * happen if the leader sends another streaming block and acquires
1949 * the stream lock again after the parallel apply worker checks
1950 * that there is no pending streaming block and before it actually
1951 * starts waiting on a lock. We can handle this case by not
1952 * allowing the leader to increment the stream block count during
1953 * the time parallel apply worker acquires the lock but it is not
1954 * clear whether that is worth the complexity.
1955 *
1956 * Now, if this missed chunk contains rollback to savepoint, then
1957 * there is a risk of deadlock which probably shouldn't happen
1958 * after restart.
1959 */
1961 break;
1962
1963 default:
1964 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1965 break;
1966 }
1967
1970
1971 /*
1972 * The parallel apply worker could be in a transaction in which case we
1973 * need to report the state as STATE_IDLEINTRANSACTION.
1974 */
1977 else
1979
1981}
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1862
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007

References AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3647 of file worker.c.

3648{
3649 bool cascade = false;
3650 bool restart_seqs = false;
3651 List *remote_relids = NIL;
3652 List *remote_rels = NIL;
3653 List *rels = NIL;
3654 List *part_rels = NIL;
3655 List *relids = NIL;
3656 List *relids_logged = NIL;
3657 ListCell *lc;
3658 LOCKMODE lockmode = AccessExclusiveLock;
3659
3660 /*
3661 * Quick return if we are skipping data modification changes or handling
3662 * streamed transactions.
3663 */
3664 if (is_skipping_changes() ||
3666 return;
3667
3669
3670 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3671
3672 foreach(lc, remote_relids)
3673 {
3674 LogicalRepRelId relid = lfirst_oid(lc);
3676
3677 rel = logicalrep_rel_open(relid, lockmode);
3679 {
3680 /*
3681 * The relation can't become interesting in the middle of the
3682 * transaction so it's safe to unlock it.
3683 */
3684 logicalrep_rel_close(rel, lockmode);
3685 continue;
3686 }
3687
3688 remote_rels = lappend(remote_rels, rel);
3690 rels = lappend(rels, rel->localrel);
3691 relids = lappend_oid(relids, rel->localreloid);
3693 relids_logged = lappend_oid(relids_logged, rel->localreloid);
3694
3695 /*
3696 * Truncate partitions if we got a message to truncate a partitioned
3697 * table.
3698 */
3699 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3700 {
3701 ListCell *child;
3702 List *children = find_all_inheritors(rel->localreloid,
3703 lockmode,
3704 NULL);
3705
3706 foreach(child, children)
3707 {
3708 Oid childrelid = lfirst_oid(child);
3709 Relation childrel;
3710
3711 if (list_member_oid(relids, childrelid))
3712 continue;
3713
3714 /* find_all_inheritors already got lock */
3715 childrel = table_open(childrelid, NoLock);
3716
3717 /*
3718 * Ignore temp tables of other backends. See similar code in
3719 * ExecuteTruncate().
3720 */
3721 if (RELATION_IS_OTHER_TEMP(childrel))
3722 {
3723 table_close(childrel, lockmode);
3724 continue;
3725 }
3726
3728 rels = lappend(rels, childrel);
3729 part_rels = lappend(part_rels, childrel);
3730 relids = lappend_oid(relids, childrelid);
3731 /* Log this relation only if needed for logical decoding */
3732 if (RelationIsLogicallyLogged(childrel))
3733 relids_logged = lappend_oid(relids_logged, childrelid);
3734 }
3735 }
3736 }
3737
3738 /*
3739 * Even if we used CASCADE on the upstream primary we explicitly default
3740 * to replaying changes without further cascading. This might be later
3741 * changeable with a user specified option.
3742 *
3743 * MySubscription->runasowner tells us whether we want to execute
3744 * replication actions as the subscription owner; the last argument to
3745 * TruncateGuts tells it whether we want to switch to the table owner.
3746 * Those are exactly opposite conditions.
3747 */
3749 relids,
3750 relids_logged,
3752 restart_seqs,
3754 foreach(lc, remote_rels)
3755 {
3756 LogicalRepRelMapEntry *rel = lfirst(lc);
3757
3759 }
3760 foreach(lc, part_rels)
3761 {
3762 Relation rel = lfirst(lc);
3763
3764 table_close(rel, NoLock);
3765 }
3766
3768}
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2398
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:32
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:615
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:711
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:668
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1975

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 3351 of file worker.c.

3355{
3356 EState *estate = edata->estate;
3357 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3358 ResultRelInfo *relinfo = edata->targetRelInfo;
3359 Relation parentrel = relinfo->ri_RelationDesc;
3360 ModifyTableState *mtstate;
3361 PartitionTupleRouting *proute;
3362 ResultRelInfo *partrelinfo;
3363 Relation partrel;
3364 TupleTableSlot *remoteslot_part;
3365 TupleConversionMap *map;
3366 MemoryContext oldctx;
3367 LogicalRepRelMapEntry *part_entry = NULL;
3368 AttrMap *attrmap = NULL;
3369
3370 /* ModifyTableState is needed for ExecFindPartition(). */
3371 edata->mtstate = mtstate = makeNode(ModifyTableState);
3372 mtstate->ps.plan = NULL;
3373 mtstate->ps.state = estate;
3374 mtstate->operation = operation;
3375 mtstate->resultRelInfo = relinfo;
3376
3377 /* ... as is PartitionTupleRouting. */
3378 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3379
3380 /*
3381 * Find the partition to which the "search tuple" belongs.
3382 */
3383 Assert(remoteslot != NULL);
3385 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3386 remoteslot, estate);
3387 Assert(partrelinfo != NULL);
3388 partrel = partrelinfo->ri_RelationDesc;
3389
3390 /*
3391 * Check for supported relkind. We need this since partitions might be of
3392 * unsupported relkinds; and the set of partitions can change, so checking
3393 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3394 */
3395 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3396 relmapentry->remoterel.relkind,
3398 RelationGetRelationName(partrel));
3399
3400 /*
3401 * To perform any of the operations below, the tuple must match the
3402 * partition's rowtype. Convert if needed or just copy, using a dedicated
3403 * slot to store the tuple in any case.
3404 */
3405 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3406 if (remoteslot_part == NULL)
3407 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3408 map = ExecGetRootToChildMap(partrelinfo, estate);
3409 if (map != NULL)
3410 {
3411 attrmap = map->attrMap;
3412 remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3413 remoteslot_part);
3414 }
3415 else
3416 {
3417 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3418 slot_getallattrs(remoteslot_part);
3419 }
3420 MemoryContextSwitchTo(oldctx);
3421
3422 /* Check if we can do the update or delete on the leaf partition. */
3423 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3424 {
3425 part_entry = logicalrep_partition_open(relmapentry, partrel,
3426 attrmap);
3427 check_relation_updatable(part_entry);
3428 }
3429
3430 switch (operation)
3431 {
3432 case CMD_INSERT:
3433 apply_handle_insert_internal(edata, partrelinfo,
3434 remoteslot_part);
3435 break;
3436
3437 case CMD_DELETE:
3438 apply_handle_delete_internal(edata, partrelinfo,
3439 remoteslot_part,
3440 part_entry->localindexoid);
3441 break;
3442
3443 case CMD_UPDATE:
3444
3445 /*
3446 * For UPDATE, depending on whether or not the updated tuple
3447 * satisfies the partition's constraint, perform a simple UPDATE
3448 * of the partition or move the updated tuple into a different
3449 * suitable partition.
3450 */
3451 {
3452 TupleTableSlot *localslot;
3453 ResultRelInfo *partrelinfo_new;
3454 Relation partrel_new;
3455 bool found;
3456 EPQState epqstate;
3457 ConflictTupleInfo conflicttuple = {0};
3458
3459 /* Get the matching local tuple from the partition. */
3460 found = FindReplTupleInLocalRel(edata, partrel,
3461 &part_entry->remoterel,
3462 part_entry->localindexoid,
3463 remoteslot_part, &localslot);
3464 if (!found)
3465 {
3467 TupleTableSlot *newslot = localslot;
3468
3469 /*
3470 * Detecting whether the tuple was recently deleted or
3471 * never existed is crucial to avoid misleading the user
3472 * during conflict handling.
3473 */
3474 if (FindDeletedTupleInLocalRel(partrel,
3475 part_entry->localindexoid,
3476 remoteslot_part,
3477 &conflicttuple.xmin,
3478 &conflicttuple.origin,
3479 &conflicttuple.ts) &&
3480 conflicttuple.origin != replorigin_session_origin)
3482 else
3484
3485 /* Store the new tuple for conflict reporting */
3486 slot_store_data(newslot, part_entry, newtup);
3487
3488 /*
3489 * The tuple to be updated could not be found or was
3490 * deleted. Do nothing except for emitting a log message.
3491 */
3492 ReportApplyConflict(estate, partrelinfo, LOG,
3493 type, remoteslot_part, newslot,
3494 list_make1(&conflicttuple));
3495
3496 return;
3497 }
3498
3499 /*
3500 * Report the conflict if the tuple was modified by a
3501 * different origin.
3502 */
3503 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3504 &conflicttuple.origin,
3505 &conflicttuple.ts) &&
3506 conflicttuple.origin != replorigin_session_origin)
3507 {
3508 TupleTableSlot *newslot;
3509
3510 /* Store the new tuple for conflict reporting */
3511 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3512 slot_store_data(newslot, part_entry, newtup);
3513
3514 conflicttuple.slot = localslot;
3515
3516 ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3517 remoteslot_part, newslot,
3518 list_make1(&conflicttuple));
3519 }
3520
3521 /*
3522 * Apply the update to the local tuple, putting the result in
3523 * remoteslot_part.
3524 */
3526 slot_modify_data(remoteslot_part, localslot, part_entry,
3527 newtup);
3528 MemoryContextSwitchTo(oldctx);
3529
3530 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3531
3532 /*
3533 * Does the updated tuple still satisfy the current
3534 * partition's constraint?
3535 */
3536 if (!partrel->rd_rel->relispartition ||
3537 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3538 false))
3539 {
3540 /*
3541 * Yes, so simply UPDATE the partition. We don't call
3542 * apply_handle_update_internal() here, which would
3543 * normally do the following work, to avoid repeating some
3544 * work already done above to find the local tuple in the
3545 * partition.
3546 */
3547 InitConflictIndexes(partrelinfo);
3548
3549 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3551 ACL_UPDATE);
3552 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3553 localslot, remoteslot_part);
3554 }
3555 else
3556 {
3557 /* Move the tuple into the new partition. */
3558
3559 /*
3560 * New partition will be found using tuple routing, which
3561 * can only occur via the parent table. We might need to
3562 * convert the tuple to the parent's rowtype. Note that
3563 * this is the tuple found in the partition, not the
3564 * original search tuple received by this function.
3565 */
3566 if (map)
3567 {
3568 TupleConversionMap *PartitionToRootMap =
3570 RelationGetDescr(parentrel));
3571
3572 remoteslot =
3573 execute_attr_map_slot(PartitionToRootMap->attrMap,
3574 remoteslot_part, remoteslot);
3575 }
3576 else
3577 {
3578 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3579 slot_getallattrs(remoteslot);
3580 }
3581
3582 /* Find the new partition. */
3584 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3585 proute, remoteslot,
3586 estate);
3587 MemoryContextSwitchTo(oldctx);
3588 Assert(partrelinfo_new != partrelinfo);
3589 partrel_new = partrelinfo_new->ri_RelationDesc;
3590
3591 /* Check that new partition also has supported relkind. */
3592 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3593 relmapentry->remoterel.relkind,
3595 RelationGetRelationName(partrel_new));
3596
3597 /* DELETE old tuple found in the old partition. */
3598 EvalPlanQualSetSlot(&epqstate, localslot);
3600 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3601
3602 /* INSERT new tuple into the new partition. */
3603
3604 /*
3605 * Convert the replacement tuple to match the destination
3606 * partition rowtype.
3607 */
3609 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3610 if (remoteslot_part == NULL)
3611 remoteslot_part = table_slot_create(partrel_new,
3612 &estate->es_tupleTable);
3613 map = ExecGetRootToChildMap(partrelinfo_new, estate);
3614 if (map != NULL)
3615 {
3616 remoteslot_part = execute_attr_map_slot(map->attrMap,
3617 remoteslot,
3618 remoteslot_part);
3619 }
3620 else
3621 {
3622 remoteslot_part = ExecCopySlot(remoteslot_part,
3623 remoteslot);
3624 slot_getallattrs(remoteslot);
3625 }
3626 MemoryContextSwitchTo(oldctx);
3627 apply_handle_insert_internal(edata, partrelinfo_new,
3628 remoteslot_part);
3629 }
3630
3631 EvalPlanQualEnd(&epqstate);
3632 }
3633 break;
3634
3635 default:
3636 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3637 break;
3638 }
3639}
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:1118
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
Definition: worker.c:3269
ConflictType
Definition: conflict.h:32
@ CT_UPDATE_DELETED
Definition: conflict.h:43
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:37
@ CT_UPDATE_MISSING
Definition: conflict.h:46
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1856
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1326
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
@ CMD_UPDATE
Definition: nodes.h:276
#define makeNode(_type_)
Definition: nodes.h:161
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:646
PartitionTupleRouting * proute
Definition: worker.c:319
ModifyTableState * mtstate
Definition: worker.c:318
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:712
CmdType operation
Definition: execnodes.h:1404
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1408
PlanState ps
Definition: execnodes.h:1403
Plan * plan
Definition: execnodes.h:1165
EState * state
Definition: execnodes.h:1167
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:619
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:193
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:371
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:524
const char * type

References ACL_DELETE, ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationDelete(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LogicalRepRelMapEntry::localindexoid, LOG, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, ConflictTupleInfo::origin, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, slot_getallattrs(), slot_modify_data(), slot_store_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ApplyExecutionData::targetRelInfo, ConflictTupleInfo::ts, type, and ConflictTupleInfo::xmin.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2586 of file worker.c.

2587{
2588 LogicalRepTyp typ;
2589
2591 return;
2592
2593 logicalrep_read_typ(s, &typ);
2594}
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:757

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2790 of file worker.c.

2791{
2793 LogicalRepRelId relid;
2794 UserContext ucxt;
2795 ApplyExecutionData *edata;
2796 EState *estate;
2797 LogicalRepTupleData oldtup;
2798 LogicalRepTupleData newtup;
2799 bool has_oldtup;
2800 TupleTableSlot *remoteslot;
2801 RTEPermissionInfo *target_perminfo;
2802 MemoryContext oldctx;
2803 bool run_as_owner;
2804
2805 /*
2806 * Quick return if we are skipping data modification changes or handling
2807 * streamed transactions.
2808 */
2809 if (is_skipping_changes() ||
2811 return;
2812
2814
2815 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2816 &newtup);
2819 {
2820 /*
2821 * The relation can't become interesting in the middle of the
2822 * transaction so it's safe to unlock it.
2823 */
2826 return;
2827 }
2828
2829 /* Set relation for error callback */
2831
2832 /* Check if we can do the update. */
2834
2835 /*
2836 * Make sure that any user-supplied code runs as the table owner, unless
2837 * the user has opted out of that behavior.
2838 */
2839 run_as_owner = MySubscription->runasowner;
2840 if (!run_as_owner)
2841 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2842
2843 /* Initialize the executor state. */
2844 edata = create_edata_for_relation(rel);
2845 estate = edata->estate;
2846 remoteslot = ExecInitExtraTupleSlot(estate,
2848 &TTSOpsVirtual);
2849
2850 /*
2851 * Populate updatedCols so that per-column triggers can fire, and so
2852 * executor can correctly pass down indexUnchanged hint. This could
2853 * include more columns than were actually changed on the publisher
2854 * because the logical replication protocol doesn't contain that
2855 * information. But it would for example exclude columns that only exist
2856 * on the subscriber, since we are not touching those.
2857 */
2858 target_perminfo = list_nth(estate->es_rteperminfos, 0);
2859 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2860 {
2862 int remoteattnum = rel->attrmap->attnums[i];
2863
2864 if (!att->attisdropped && remoteattnum >= 0)
2865 {
2866 Assert(remoteattnum < newtup.ncols);
2867 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2868 target_perminfo->updatedCols =
2869 bms_add_member(target_perminfo->updatedCols,
2871 }
2872 }
2873
2874 /* Build the search tuple. */
2876 slot_store_data(remoteslot, rel,
2877 has_oldtup ? &oldtup : &newtup);
2878 MemoryContextSwitchTo(oldctx);
2879
2880 /* For a partitioned table, apply update to correct partition. */
2881 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2883 remoteslot, &newtup, CMD_UPDATE);
2884 else
2886 remoteslot, &newtup, rel->localindexoid);
2887
2888 finish_edata(edata);
2889
2890 /* Reset relation for error callback */
2892
2893 if (!run_as_owner)
2894 RestoreUserContext(&ucxt);
2895
2897
2899}
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2907
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:814
int i
Definition: isn.c:77
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:487
AttrNumber * attnums
Definition: attmap.h:36
bool attisdropped
Definition: tupdesc.h:77
List * es_rteperminfos
Definition: execnodes.h:668
Bitmapset * updatedCols
Definition: parsenodes.h:1326
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:122
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), CompactAttribute::attisdropped, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescCompactAttr(), and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2907 of file worker.c.

2912{
2913 EState *estate = edata->estate;
2914 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2915 Relation localrel = relinfo->ri_RelationDesc;
2916 EPQState epqstate;
2917 TupleTableSlot *localslot = NULL;
2918 ConflictTupleInfo conflicttuple = {0};
2919 bool found;
2920 MemoryContext oldctx;
2921
2922 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2923 ExecOpenIndices(relinfo, false);
2924
2925 found = FindReplTupleInLocalRel(edata, localrel,
2926 &relmapentry->remoterel,
2927 localindexoid,
2928 remoteslot, &localslot);
2929
2930 /*
2931 * Tuple found.
2932 *
2933 * Note this will fail if there are other conflicting unique indexes.
2934 */
2935 if (found)
2936 {
2937 /*
2938 * Report the conflict if the tuple was modified by a different
2939 * origin.
2940 */
2941 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2942 &conflicttuple.origin, &conflicttuple.ts) &&
2943 conflicttuple.origin != replorigin_session_origin)
2944 {
2945 TupleTableSlot *newslot;
2946
2947 /* Store the new tuple for conflict reporting */
2948 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2949 slot_store_data(newslot, relmapentry, newtup);
2950
2951 conflicttuple.slot = localslot;
2952
2954 remoteslot, newslot,
2955 list_make1(&conflicttuple));
2956 }
2957
2958 /* Process and store remote tuple in the slot */
2960 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2961 MemoryContextSwitchTo(oldctx);
2962
2963 EvalPlanQualSetSlot(&epqstate, remoteslot);
2964
2965 InitConflictIndexes(relinfo);
2966
2967 /* Do the actual update. */
2969 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2970 remoteslot);
2971 }
2972 else
2973 {
2975 TupleTableSlot *newslot = localslot;
2976
2977 /*
2978 * Detecting whether the tuple was recently deleted or never existed
2979 * is crucial to avoid misleading the user during conflict handling.
2980 */
2981 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2982 &conflicttuple.xmin,
2983 &conflicttuple.origin,
2984 &conflicttuple.ts) &&
2985 conflicttuple.origin != replorigin_session_origin)
2987 else
2989
2990 /* Store the new tuple for conflict reporting */
2991 slot_store_data(newslot, relmapentry, newtup);
2992
2993 /*
2994 * The tuple to be updated could not be found or was deleted. Do
2995 * nothing except for emitting a log message.
2996 */
2997 ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2998 list_make1(&conflicttuple));
2999 }
3000
3001 /* Cleanup. */
3002 ExecCloseIndices(relinfo);
3003 EvalPlanQualEnd(&epqstate);
3004}

References ACL_UPDATE, CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LOG, MemoryContextSwitchTo(), NIL, ConflictTupleInfo::origin, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, slot_modify_data(), slot_store_data(), table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ConflictTupleInfo::ts, type, and ConflictTupleInfo::xmin.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2260 of file worker.c.

2262{
2263 int nchanges;
2264 char path[MAXPGPATH];
2265 char *buffer = NULL;
2266 MemoryContext oldcxt;
2267 ResourceOwner oldowner;
2268 int fileno;
2269 off_t offset;
2270
2273
2274 /* Make sure we have an open transaction */
2276
2277 /*
2278 * Allocate file handle and memory required to process all the messages in
2279 * TopTransactionContext to avoid them getting reset after each message is
2280 * processed.
2281 */
2283
2284 /* Open the spool file for the committed/prepared transaction */
2286 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2287
2288 /*
2289 * Make sure the file is owned by the toplevel transaction so that the
2290 * file will not be accidentally closed when aborting a subtransaction.
2291 */
2292 oldowner = CurrentResourceOwner;
2294
2295 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2296
2297 CurrentResourceOwner = oldowner;
2298
2299 buffer = palloc(BLCKSZ);
2300
2301 MemoryContextSwitchTo(oldcxt);
2302
2303 remote_final_lsn = lsn;
2304
2305 /*
2306 * Make sure the handle apply_dispatch methods are aware we're in a remote
2307 * transaction.
2308 */
2309 in_remote_transaction = true;
2311
2313
2314 /*
2315 * Read the entries one by one and pass them through the same logic as in
2316 * apply_dispatch.
2317 */
2318 nchanges = 0;
2319 while (true)
2320 {
2322 size_t nbytes;
2323 int len;
2324
2326
2327 /* read length of the on-disk record */
2328 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2329
2330 /* have we reached end of the file? */
2331 if (nbytes == 0)
2332 break;
2333
2334 /* do we have a correct length? */
2335 if (len <= 0)
2336 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2337 len, path);
2338
2339 /* make sure we have sufficiently large buffer */
2340 buffer = repalloc(buffer, len);
2341
2342 /* and finally read the data into the buffer */
2343 BufFileReadExact(stream_fd, buffer, len);
2344
2345 BufFileTell(stream_fd, &fileno, &offset);
2346
2347 /* init a stringinfo using the buffer and call apply_dispatch */
2348 initReadOnlyStringInfo(&s2, buffer, len);
2349
2350 /* Ensure we are reading the data into our memory context. */
2352
2354
2356
2357 MemoryContextSwitchTo(oldcxt);
2358
2359 nchanges++;
2360
2361 /*
2362 * It is possible the file has been closed because we have processed
2363 * the transaction end message like stream_commit in which case that
2364 * must be the last message.
2365 */
2366 if (!stream_fd)
2367 {
2368 ensure_last_message(stream_fileset, xid, fileno, offset);
2369 break;
2370 }
2371
2372 if (nchanges % 1000 == 0)
2373 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2374 nchanges, path);
2375 }
2376
2377 if (stream_fd)
2379
2380 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2381 nchanges, path);
2382
2383 return;
2384}
MemoryContext ApplyMessageContext
Definition: worker.c:471
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5367
void apply_dispatch(StringInfo s)
Definition: worker.c:3775
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:2228
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopTransactionContext
Definition: mcxt.c:171
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1610
void * palloc(Size size)
Definition: mcxt.c:1365
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:175
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 5004 of file worker.c.

5005{
5007 {
5008 /*
5009 * Don't stop the parallel apply worker as the leader will detect the
5010 * subscription parameter change and restart logical replication later
5011 * anyway. This also prevents the leader from reporting errors when
5012 * trying to communicate with a stopped parallel apply worker, which
5013 * would accidentally disable subscriptions if disable_on_error was
5014 * set.
5015 */
5016 return;
5017 }
5018
5019 /*
5020 * Reset the last-start time for this apply worker so that the launcher
5021 * will restart it without waiting for wal_retrieve_retry_interval if the
5022 * subscription is still active, and so that we won't leak that hash table
5023 * entry if it isn't.
5024 */
5027
5028 proc_exit(0);
5029}
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1154
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), maybe_reread_subscription(), and resume_conflict_info_retention().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 5923 of file worker.c.

5924{
5925 int worker_slot = DatumGetInt32(main_arg);
5926
5928
5929 SetupApplyOrSyncWorker(worker_slot);
5930
5932
5934
5935 proc_exit(0);
5936}
static void run_apply_worker(void)
Definition: worker.c:5624
bool InitializingApplyWorker
Definition: worker.c:499
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5869
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 6269 of file worker.c.

6270{
6271 if (isCommit && on_commit_wakeup_workers_subids != NIL)
6272 {
6273 ListCell *lc;
6274
6275 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6277 {
6278 Oid subid = lfirst_oid(lc);
6279 List *workers;
6280 ListCell *lc2;
6281
6282 workers = logicalrep_workers_find(subid, true, false);
6283 foreach(lc2, workers)
6284 {
6285 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6286
6288 }
6289 }
6290 LWLockRelease(LogicalRepWorkerLock);
6291 }
6292
6293 /* The List storage will be reclaimed automatically in xact cleanup. */
6295}
static List * on_commit_wakeup_workers_subids
Definition: worker.c:482
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:293
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:746
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ can_advance_nonremovable_xid()

static bool can_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4401 of file worker.c.

4402{
4403 /*
4404 * It is sufficient to manage non-removable transaction ID for a
4405 * subscription by the main apply worker to detect update_deleted reliably
4406 * even for table sync or parallel apply workers.
4407 */
4409 return false;
4410
4411 /* No need to advance if retaining dead tuples is not required */
4413 return false;
4414
4415 return true;
4416}

References am_leader_apply_worker(), MySubscription, and Subscription::retaindeadtuples.

Referenced by maybe_advance_nonremovable_xid().

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5367 of file worker.c.

5368{
5369 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5370}
#define snprintf
Definition: port.h:260

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2749 of file worker.c.

2750{
2751 /*
2752 * For partitioned tables, we only need to care if the target partition is
2753 * updatable (aka has PK or RI defined for it).
2754 */
2755 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2756 return;
2757
2758 /* Updatable, no error. */
2759 if (rel->updatable)
2760 return;
2761
2762 /*
2763 * We are in error mode so it's fine this is somewhat slow. It's better to
2764 * give user correct error.
2765 */
2767 {
2768 ereport(ERROR,
2769 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2770 errmsg("publisher did not send replica identity column "
2771 "expected by the logical replication target relation \"%s.%s\"",
2772 rel->remoterel.nspname, rel->remoterel.relname)));
2773 }
2774
2775 ereport(ERROR,
2776 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2777 errmsg("logical replication target relation \"%s.%s\" has "
2778 "neither REPLICA IDENTITY index nor PRIMARY "
2779 "KEY and published relation does not have "
2780 "REPLICA IDENTITY FULL",
2781 rel->remoterel.nspname, rel->remoterel.relname)));
2782}
#define OidIsValid(objectId)
Definition: c.h:777
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:904

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 5564 of file worker.c.

5565{
5568
5569 subxact_data.subxacts = NULL;
5573}
static ApplySubXactData subxact_data
Definition: worker.c:545
void pfree(void *pointer)
Definition: mcxt.c:1594
uint32 nsubxacts
Definition: worker.c:539
uint32 nsubxacts_max
Definition: worker.c:540
SubXactInfo * subxacts
Definition: worker.c:542
TransactionId subxact_last
Definition: worker.c:541

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 6070 of file worker.c.

6071{
6072 Relation rel;
6073 Form_pg_subscription subform;
6074 HeapTuple tup;
6075 XLogRecPtr myskiplsn = MySubscription->skiplsn;
6076 bool started_tx = false;
6077
6079 return;
6080
6081 if (!IsTransactionState())
6082 {
6084 started_tx = true;
6085 }
6086
6087 /*
6088 * Updating pg_subscription might involve TOAST table access, so ensure we
6089 * have a valid snapshot.
6090 */
6092
6093 /*
6094 * Protect subskiplsn of pg_subscription from being concurrently updated
6095 * while clearing it.
6096 */
6097 LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6099
6100 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6101
6102 /* Fetch the existing tuple. */
6103 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6105
6106 if (!HeapTupleIsValid(tup))
6107 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6108
6109 subform = (Form_pg_subscription) GETSTRUCT(tup);
6110
6111 /*
6112 * Clear the subskiplsn. If the user has already changed subskiplsn before
6113 * clearing it we don't update the catalog and the replication origin
6114 * state won't get advanced. So in the worst case, if the server crashes
6115 * before sending an acknowledgment of the flush position the transaction
6116 * will be sent again and the user needs to set subskiplsn again. We can
6117 * reduce the possibility by logging a replication origin WAL record to
6118 * advance the origin LSN instead but there is no way to advance the
6119 * origin timestamp and it doesn't seem to be worth doing anything about
6120 * it since it's a very rare case.
6121 */
6122 if (subform->subskiplsn == myskiplsn)
6123 {
6124 bool nulls[Natts_pg_subscription];
6125 bool replaces[Natts_pg_subscription];
6126 Datum values[Natts_pg_subscription];
6127
6128 memset(values, 0, sizeof(values));
6129 memset(nulls, false, sizeof(nulls));
6130 memset(replaces, false, sizeof(replaces));
6131
6132 /* reset subskiplsn */
6133 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6134 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6135
6136 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6137 replaces);
6138 CatalogTupleUpdate(rel, &tup->t_self, tup);
6139
6140 if (myskiplsn != finish_lsn)
6142 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6143 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6144 LSN_FORMAT_ARGS(finish_lsn),
6145 LSN_FORMAT_ARGS(myskiplsn)));
6146 }
6147
6148 heap_freetuple(tup);
6149 table_close(rel, NoLock);
6150
6152
6153 if (started_tx)
6155}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define likely(x)
Definition: c.h:406
int errdetail(const char *fmt,...)
Definition: elog.c:1216
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
void PopActiveSnapshot(void)
Definition: snapmgr.c:775
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT(), GetTransactionSnapshot(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsValid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData * create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 870 of file worker.c.

871{
872 ApplyExecutionData *edata;
873 EState *estate;
874 RangeTblEntry *rte;
875 List *perminfos = NIL;
876 ResultRelInfo *resultRelInfo;
877
878 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
879 edata->targetRel = rel;
880
881 edata->estate = estate = CreateExecutorState();
882
883 rte = makeNode(RangeTblEntry);
884 rte->rtekind = RTE_RELATION;
885 rte->relid = RelationGetRelid(rel->localrel);
886 rte->relkind = rel->localrel->rd_rel->relkind;
887 rte->rellockmode = AccessShareLock;
888
889 addRTEPermissionInfo(&perminfos, rte);
890
891 ExecInitRangeTable(estate, list_make1(rte), perminfos,
893
894 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
895
896 /*
897 * Use Relation opened by logicalrep_rel_open() instead of opening it
898 * again.
899 */
900 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
901
902 /*
903 * We put the ResultRelInfo in the es_opened_result_relations list, even
904 * though we don't populate the es_result_relations array. That's a bit
905 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
906 *
907 * ExecOpenIndices() is not called here either, each execution path doing
908 * an apply operation being responsible for that.
909 */
911 lappend(estate->es_opened_result_relations, resultRelInfo);
912
913 estate->es_output_cid = GetCurrentCommandId(true);
914
915 /* Prepare to catch AFTER triggers. */
917
918 /* other fields of edata remain NULL for now */
919
920 return edata;
921}
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1243
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:773
EState * CreateExecutorState(void)
Definition: execUtils.c:88
void * palloc0(Size size)
Definition: mcxt.c:1395
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1043
#define RelationGetRelid(relation)
Definition: rel.h:515
List * es_opened_result_relations
Definition: execnodes.h:688
CommandId es_output_cid
Definition: execnodes.h:682
RTEKind rtekind
Definition: parsenodes.h:1078
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5104
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:830

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), bms_make_singleton(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 5943 of file worker.c.

5944{
5945 /*
5946 * Emit the error message, and recover from the error state to an idle
5947 * state
5948 */
5950
5954
5956
5957 /*
5958 * Report the worker failed during sequence synchronization, table
5959 * synchronization, or apply.
5960 */
5963
5964 /* Disable the subscription */
5966
5967 /*
5968 * Updating pg_subscription might involve TOAST table access, so ensure we
5969 * have a valid snapshot.
5970 */
5972
5976
5977 /* Ensure we remove no-longer-useful entry for worker's start time */
5980
5981 /* Notify the subscription has been disabled and exit */
5982 ereport(LOG,
5983 errmsg("subscription \"%s\" has been disabled because of an error",
5985
5986 /*
5987 * Skip the track_commit_timestamp check when disabling the worker due to
5988 * an error, as verifying commit timestamps is unnecessary in this
5989 * context.
5990 */
5994
5995 proc_exit(0);
5996}
void EmitErrorReport(void)
Definition: elog.c:1704
void FlushErrorState(void)
Definition: elog.c:1884
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
LogicalRepWorkerType type
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4880

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), ApplyLauncherForgetWorkerStartTime(), CheckSubDeadTupleRetention(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), PopActiveSnapshot(), proc_exit(), PushActiveSnapshot(), RESUME_INTERRUPTS, Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), LogicalRepWorker::subid, LogicalRepWorker::type, and WARNING.

Referenced by start_apply(), start_sequence_sync(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 2228 of file worker.c.

2230{
2231 char path[MAXPGPATH];
2232 BufFile *fd;
2233 int last_fileno;
2234 off_t last_offset;
2235
2237
2239
2241
2242 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2243
2244 BufFileSeek(fd, 0, 0, SEEK_END);
2245 BufFileTell(fd, &last_fileno, &last_offset);
2246
2248
2250
2251 if (last_fileno != fileno || last_offset != offset)
2252 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2253 path);
2254}
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindDeletedTupleInLocalRel()

static bool FindDeletedTupleInLocalRel ( Relation  localrel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TransactionId delete_xid,
RepOriginId delete_origin,
TimestampTz delete_time 
)
static

Definition at line 3269 of file worker.c.

3273{
3274 TransactionId oldestxmin;
3275
3276 /*
3277 * Return false if either dead tuples are not retained or commit timestamp
3278 * data is not available.
3279 */
3281 return false;
3282
3283 /*
3284 * For conflict detection, we use the leader worker's
3285 * oldest_nonremovable_xid value instead of invoking
3286 * GetOldestNonRemovableTransactionId() or using the conflict detection
3287 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3288 * identify tuples that were recently deleted. These deleted tuples are no
3289 * longer visible to concurrent transactions. However, if a remote update
3290 * matches such a tuple, we log an update_deleted conflict.
3291 *
3292 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3293 * transaction IDs older than oldest_nonremovable_xid, for our current
3294 * purpose, it is acceptable to treat tuples deleted by transactions prior
3295 * to oldest_nonremovable_xid as update_missing conflicts.
3296 */
3298 {
3300 }
3301 else
3302 {
3303 LogicalRepWorker *leader;
3304
3305 /*
3306 * Obtain the information from the leader apply worker as only the
3307 * leader manages oldest_nonremovable_xid (see
3308 * maybe_advance_nonremovable_xid() for details).
3309 */
3310 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3313 false);
3314 if (!leader)
3315 {
3316 ereport(ERROR,
3317 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3318 errmsg("could not detect conflict as the leader apply worker has exited")));
3319 }
3320
3321 SpinLockAcquire(&leader->relmutex);
3322 oldestxmin = leader->oldest_nonremovable_xid;
3323 SpinLockRelease(&leader->relmutex);
3324 LWLockRelease(LogicalRepWorkerLock);
3325 }
3326
3327 /*
3328 * Return false if the leader apply worker has stopped retaining
3329 * information for detecting conflicts. This implies that update_deleted
3330 * can no longer be reliably detected.
3331 */
3332 if (!TransactionIdIsValid(oldestxmin))
3333 return false;
3334
3335 if (OidIsValid(localidxoid) &&
3336 IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3337 return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3338 remoteslot, oldestxmin,
3339 delete_xid, delete_origin,
3340 delete_time);
3341 else
3342 return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3343 oldestxmin, delete_xid,
3344 delete_origin, delete_time);
3345}
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition: worker.c:3235
bool track_commit_timestamp
Definition: commit_ts.c:109
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TransactionId oldest_nonremovable_xid

References am_leader_apply_worker(), ereport, errcode(), errmsg(), ERROR, InvalidOid, IsIndexUsableForFindingDeletedTuple(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, MySubscription, OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, RelationFindDeletedTupleInfoByIndex(), RelationFindDeletedTupleInfoSeq(), LogicalRepWorker::relmutex, Subscription::retaindeadtuples, SpinLockAcquire, SpinLockRelease, LogicalRepWorker::subid, track_commit_timestamp, TransactionIdIsValid, and WORKERTYPE_APPLY.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 3174 of file worker.c.

3179{
3180 EState *estate = edata->estate;
3181 bool found;
3182
3183 /*
3184 * Regardless of the top-level operation, we're performing a read here, so
3185 * check for SELECT privileges.
3186 */
3188
3189 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3190
3191 Assert(OidIsValid(localidxoid) ||
3192 (remoterel->replident == REPLICA_IDENTITY_FULL));
3193
3194 if (OidIsValid(localidxoid))
3195 {
3196#ifdef USE_ASSERT_CHECKING
3197 Relation idxrel = index_open(localidxoid, AccessShareLock);
3198
3199 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3200 Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3201 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3203 edata->targetRel->attrmap)));
3205#endif
3206
3207 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3209 remoteslot, *localslot);
3210 }
3211 else
3213 remoteslot, *localslot);
3214
3215 return found;
3216}
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition: relation.c:834

References AccessShareLock, ACL_SELECT, Assert(), LogicalRepRelMapEntry::attrmap, EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 928 of file worker.c.

929{
930 EState *estate = edata->estate;
931
932 /* Handle any queued AFTER triggers. */
933 AfterTriggerEndQuery(estate);
934
935 /* Shut down tuple routing, if any was done. */
936 if (edata->proute)
937 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
938
939 /*
940 * Cleanup. It might seem that we should call ExecCloseResultRelations()
941 * here, but we intentionally don't. It would close the rel we added to
942 * es_opened_result_relations above, which is wrong because we took no
943 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
944 * any other relations opened during execution.
945 */
946 ExecResetTupleTable(estate->es_tupleTable, false);
947 FreeExecutorState(estate);
948 pfree(edata);
949}
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1380
void FreeExecutorState(EState *estate)
Definition: execUtils.c:192
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5124

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_candidate_xid()

static void get_candidate_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4453 of file worker.c.

4454{
4455 TransactionId oldest_running_xid;
4457
4458 /*
4459 * Use last_recv_time when applying changes in the loop to avoid
4460 * unnecessary system time retrieval. If last_recv_time is not available,
4461 * obtain the current timestamp.
4462 */
4463 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4464
4465 /*
4466 * Compute the candidate_xid and request the publisher status at most once
4467 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4468 * details on how this value is dynamically adjusted. This is to avoid
4469 * using CPU and network resources without making much progress.
4470 */
4472 rdt_data->xid_advance_interval))
4473 return;
4474
4475 /*
4476 * Immediately update the timer, even if the function returns later
4477 * without setting candidate_xid due to inactivity on the subscriber. This
4478 * avoids frequent calls to GetOldestActiveTransactionId.
4479 */
4480 rdt_data->candidate_xid_time = now;
4481
4482 /*
4483 * Consider transactions in the current database, as only dead tuples from
4484 * this database are required for conflict detection.
4485 */
4486 oldest_running_xid = GetOldestActiveTransactionId(false, false);
4487
4488 /*
4489 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4490 * of its previously computed value.
4491 */
4493 oldest_running_xid));
4494
4495 /* Return if the oldest_nonremovable_xid cannot be advanced */
4497 oldest_running_xid))
4498 {
4499 adjust_xid_advance_interval(rdt_data, false);
4500 return;
4501 }
4502
4503 adjust_xid_advance_interval(rdt_data, true);
4504
4505 rdt_data->candidate_xid = oldest_running_xid;
4507
4508 /* process the next phase */
4509 process_rdt_phase_transition(rdt_data, false);
4510}
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition: worker.c:4955
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4423
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
int64 TimestampTz
Definition: timestamp.h:39
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition: procarray.c:2833
TimestampTz last_recv_time
Definition: worker.c:443
TimestampTz candidate_xid_time
Definition: worker.c:444
RetainDeadTuplesPhase phase
Definition: worker.c:403
TransactionId candidate_xid
Definition: worker.c:430
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References adjust_xid_advance_interval(), Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, GetCurrentTimestamp(), GetOldestActiveTransactionId(), RetainDeadTuplesData::last_recv_time, MyLogicalRepWorker, now(), LogicalRepWorker::oldest_nonremovable_xid, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, TimestampDifferenceExceeds(), TransactionIdEquals, TransactionIdPrecedesOrEquals(), and RetainDeadTuplesData::xid_advance_interval.

Referenced by process_rdt_phase_transition().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool *  have_pending_txes 
)
static

Definition at line 3895 of file worker.c.

3897{
3898 dlist_mutable_iter iter;
3899 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3900
3902 *flush = InvalidXLogRecPtr;
3903
3905 {
3906 FlushPosition *pos =
3907 dlist_container(FlushPosition, node, iter.cur);
3908
3909 *write = pos->remote_end;
3910
3911 if (pos->local_end <= local_flush)
3912 {
3913 *flush = pos->remote_end;
3914 dlist_delete(iter.cur);
3915 pfree(pos);
3916 }
3917 else
3918 {
3919 /*
3920 * Don't want to uselessly iterate over the rest of the list which
3921 * could potentially be long. Instead get the last element and
3922 * grab the write position from there.
3923 */
3925 &lsn_mapping);
3926 *write = pos->remote_end;
3927 *have_pending_txes = true;
3928 return;
3929 }
3930 }
3931
3932 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3933}
static dlist_head lsn_mapping
Definition: worker.c:308
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:305
XLogRecPtr local_end
Definition: worker.c:304
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6571

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback(), and wait_for_local_flush().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 6316 of file worker.c.

6317{
6318 *winfo = NULL;
6319
6321 {
6322 return TRANS_PARALLEL_APPLY;
6323 }
6324
6325 /*
6326 * If we are processing this transaction using a parallel apply worker
6327 * then either we send the changes to the parallel worker or if the worker
6328 * is busy then serialize the changes to the file which will later be
6329 * processed by the parallel worker.
6330 */
6331 *winfo = pa_find_worker(xid);
6332
6333 if (*winfo && (*winfo)->serialize_changes)
6334 {
6336 }
6337 else if (*winfo)
6338 {
6340 }
6341
6342 /*
6343 * If there is no parallel worker involved to process this transaction
6344 * then we either directly apply the change or serialize it to a file
6345 * which will later be applied when the transaction finish message is
6346 * processed.
6347 */
6348 else if (in_streamed_transaction)
6349 {
6351 }
6352 else
6353 {
6354 return TRANS_LEADER_APPLY;
6355 }
6356}
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 777 of file worker.c.

778{
779 TransactionId current_xid;
781 TransApplyAction apply_action;
782 StringInfoData original_msg;
783
784 apply_action = get_transaction_apply_action(stream_xid, &winfo);
785
786 /* not in streaming mode */
787 if (apply_action == TRANS_LEADER_APPLY)
788 return false;
789
791
792 /*
793 * The parallel apply worker needs the xid in this message to decide
794 * whether to define a savepoint, so save the original message that has
795 * not moved the cursor after the xid. We will serialize this message to a
796 * file in PARTIAL_SERIALIZE mode.
797 */
798 original_msg = *s;
799
800 /*
801 * We should have received XID of the subxact as the first part of the
802 * message, so extract it.
803 */
804 current_xid = pq_getmsgint(s, 4);
805
806 if (!TransactionIdIsValid(current_xid))
808 (errcode(ERRCODE_PROTOCOL_VIOLATION),
809 errmsg_internal("invalid transaction ID in streamed replication transaction")));
810
811 switch (apply_action)
812 {
815
816 /* Add the new subxact to the array (unless already there). */
817 subxact_info_add(current_xid);
818
819 /* Write the change to the current file */
821 return true;
822
824 Assert(winfo);
825
826 /*
827 * XXX The publisher side doesn't always send relation/type update
828 * messages after the streaming transaction, so also update the
829 * relation/type in leader apply worker. See function
830 * cleanup_rel_sync_cache.
831 */
832 if (pa_send_data(winfo, s->len, s->data))
833 return (action != LOGICAL_REP_MSG_RELATION &&
835
836 /*
837 * Switch to serialize mode when we are not able to send the
838 * change to parallel apply worker.
839 */
840 pa_switch_to_partial_serialize(winfo, false);
841
842 /* fall through */
844 stream_write_change(action, &original_msg);
845
846 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
847 return (action != LOGICAL_REP_MSG_RELATION &&
849
852
853 /* Define a savepoint for a subxact if needed. */
854 pa_start_subtrans(current_xid, stream_xid);
855 return false;
856
857 default:
858 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
859 return false; /* silence compiler warning */
860 }
861}
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:5282
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415

References generate_unaccent_rules::action, Assert(), StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 5737 of file worker.c.

5738{
5739 MemoryContext oldctx;
5740
5741 /* Run as replica session replication role. */
5742 SetConfigOption("session_replication_role", "replica",
5744
5745 /* Connect to our database. */
5748 0);
5749
5750 /*
5751 * Set always-secure search path, so malicious users can't redirect user
5752 * code (e.g. pg_index.indexprs).
5753 */
5754 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5755
5756 /* Load the subscription into persistent memory context. */
5758 "ApplyContext",
5762
5763 /*
5764 * Lock the subscription to prevent it from being concurrently dropped,
5765 * then re-verify its existence. After the initialization, the worker will
5766 * be terminated gracefully if the subscription is dropped.
5767 */
5768 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5771 if (!MySubscription)
5772 {
5773 ereport(LOG,
5774 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5776
5777 /* Ensure we remove no-longer-useful entry for worker's start time */
5780
5781 proc_exit(0);
5782 }
5783
5784 MySubscriptionValid = true;
5785 MemoryContextSwitchTo(oldctx);
5786
5787 if (!MySubscription->enabled)
5788 {
5789 ereport(LOG,
5790 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5791 MySubscription->name)));
5792
5794 }
5795
5796 /*
5797 * Restart the worker if retain_dead_tuples was enabled during startup.
5798 *
5799 * At this point, the replication slot used for conflict detection might
5800 * not exist yet, or could be dropped soon if the launcher perceives
5801 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5802 * oldest_nonremovable_xid when the slot is absent or at risk of being
5803 * dropped, a restart is initiated.
5804 *
5805 * The oldest_nonremovable_xid should be initialized only when the
5806 * subscription's retention is active before launching the worker. See
5807 * logicalrep_worker_launch.
5808 */
5809 if (am_leader_apply_worker() &&
5813 {
5814 ereport(LOG,
5815 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5816 MySubscription->name, "retain_dead_tuples"));
5817
5819 }
5820
5821 /* Setup synchronous commit according to the user's wishes */
5822 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5824
5825 /*
5826 * Keep us informed about subscription or role changes. Note that the
5827 * role's superuser privilege can be revoked.
5828 */
5829 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5831 (Datum) 0);
5832
5835 (Datum) 0);
5836
5837 if (am_tablesync_worker())
5838 ereport(LOG,
5839 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5842 else if (am_sequencesync_worker())
5843 ereport(LOG,
5844 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5846 else
5847 ereport(LOG,
5848 errmsg("logical replication apply worker for subscription \"%s\" has started",
5850
5852}
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:5167
static void apply_worker_exit(void)
Definition: worker.c:5004
MemoryContext ApplyContext
Definition: worker.c:472
static bool MySubscriptionValid
Definition: worker.c:480
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:890
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4196
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
@ PGC_BACKEND
Definition: guc.h:77
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
MemoryContext TopMemoryContext
Definition: mcxt.c:166
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)
static bool am_sequencesync_worker(void)

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LockSharedObject(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, Subscription::retaindeadtuples, Subscription::retentionactive, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, TransactionIdIsValid, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsIndexUsableForFindingDeletedTuple()

static bool IsIndexUsableForFindingDeletedTuple ( Oid  localindexoid,
TransactionId  conflict_detection_xmin 
)
static

Definition at line 3235 of file worker.c.

3237{
3238 HeapTuple index_tuple;
3239 TransactionId index_xmin;
3240
3241 index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3242
3243 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3244 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3245
3246 /*
3247 * No need to check for a frozen transaction ID, as
3248 * TransactionIdPrecedes() manages it internally, treating it as falling
3249 * behind the conflict_detection_xmin.
3250 */
3251 index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3252
3253 ReleaseSysCache(index_tuple);
3254
3255 return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3256}
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
Definition: htup_details.h:324
HeapTupleHeader t_data
Definition: htup.h:68
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References elog, ERROR, HeapTupleHeaderGetXmin(), HeapTupleIsValid, ObjectIdGetDatum(), ReleaseSysCache(), SearchSysCache1(), HeapTupleData::t_data, and TransactionIdPrecedes().

Referenced by FindDeletedTupleInLocalRel().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 6011 of file worker.c.

6012{
6014}
bool IsLogicalWorker(void)
Definition: worker.c:6002

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 6002 of file worker.c.

6003{
6004 return MyLogicalRepWorker != NULL;
6005}

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3981 of file worker.c.

3982{
3983 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3984 bool ping_sent = false;
3985 TimeLineID tli;
3986 ErrorContextCallback errcallback;
3987 RetainDeadTuplesData rdt_data = {0};
3988
3989 /*
3990 * Init the ApplyMessageContext which we clean up after each replication
3991 * protocol message.
3992 */
3994 "ApplyMessageContext",
3996
3997 /*
3998 * This memory context is used for per-stream data when the streaming mode
3999 * is enabled. This context is reset on each stream stop.
4000 */
4002 "LogicalStreamingContext",
4004
4005 /* mark as idle, before starting to loop */
4007
4008 /*
4009 * Push apply error context callback. Fields will be filled while applying
4010 * a change.
4011 */
4012 errcallback.callback = apply_error_callback;
4013 errcallback.previous = error_context_stack;
4014 error_context_stack = &errcallback;
4016
4017 /* This outer loop iterates once per wait. */
4018 for (;;)
4019 {
4021 int rc;
4022 int len;
4023 char *buf = NULL;
4024 bool endofstream = false;
4025 long wait_time;
4026
4028
4030
4032
4033 if (len != 0)
4034 {
4035 /* Loop to process all available data (without blocking). */
4036 for (;;)
4037 {
4039
4040 if (len == 0)
4041 {
4042 break;
4043 }
4044 else if (len < 0)
4045 {
4046 ereport(LOG,
4047 (errmsg("data stream from publisher has ended")));
4048 endofstream = true;
4049 break;
4050 }
4051 else
4052 {
4053 int c;
4055
4057 {
4058 ConfigReloadPending = false;
4060 }
4061
4062 /* Reset timeout. */
4063 last_recv_timestamp = GetCurrentTimestamp();
4064 ping_sent = false;
4065
4066 rdt_data.last_recv_time = last_recv_timestamp;
4067
4068 /* Ensure we are reading the data into our memory context. */
4070
4072
4073 c = pq_getmsgbyte(&s);
4074
4075 if (c == PqReplMsg_WALData)
4076 {
4077 XLogRecPtr start_lsn;
4078 XLogRecPtr end_lsn;
4079 TimestampTz send_time;
4080
4081 start_lsn = pq_getmsgint64(&s);
4082 end_lsn = pq_getmsgint64(&s);
4083 send_time = pq_getmsgint64(&s);
4084
4085 if (last_received < start_lsn)
4086 last_received = start_lsn;
4087
4088 if (last_received < end_lsn)
4089 last_received = end_lsn;
4090
4091 UpdateWorkerStats(last_received, send_time, false);
4092
4093 apply_dispatch(&s);
4094
4095 maybe_advance_nonremovable_xid(&rdt_data, false);
4096 }
4097 else if (c == PqReplMsg_Keepalive)
4098 {
4099 XLogRecPtr end_lsn;
4101 bool reply_requested;
4102
4103 end_lsn = pq_getmsgint64(&s);
4105 reply_requested = pq_getmsgbyte(&s);
4106
4107 if (last_received < end_lsn)
4108 last_received = end_lsn;
4109
4110 send_feedback(last_received, reply_requested, false);
4111
4112 maybe_advance_nonremovable_xid(&rdt_data, false);
4113
4114 UpdateWorkerStats(last_received, timestamp, true);
4115 }
4116 else if (c == PqReplMsg_PrimaryStatusUpdate)
4117 {
4118 rdt_data.remote_lsn = pq_getmsgint64(&s);
4121 rdt_data.reply_time = pq_getmsgint64(&s);
4122
4123 /*
4124 * This should never happen, see
4125 * ProcessStandbyPSRequestMessage. But if it happens
4126 * due to a bug, we don't want to proceed as it can
4127 * incorrectly advance oldest_nonremovable_xid.
4128 */
4129 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4130 elog(ERROR, "cannot get the latest WAL position from the publisher");
4131
4132 maybe_advance_nonremovable_xid(&rdt_data, true);
4133
4134 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4135 }
4136 /* other message types are purposefully ignored */
4137
4139 }
4140
4142 }
4143 }
4144
4145 /* confirm all writes so far */
4146 send_feedback(last_received, false, false);
4147
4148 /* Reset the timestamp if no message was received */
4149 rdt_data.last_recv_time = 0;
4150
4151 maybe_advance_nonremovable_xid(&rdt_data, false);
4152
4154 {
4155 /*
4156 * If we didn't get any transactions for a while there might be
4157 * unconsumed invalidation messages in the queue, consume them
4158 * now.
4159 */
4162
4163 /*
4164 * Process any relations that are being synchronized in parallel
4165 * and any newly added tables or sequences.
4166 */
4167 ProcessSyncingRelations(last_received);
4168 }
4169
4170 /* Cleanup the memory. */
4173
4174 /* Check if we need to exit the streaming loop. */
4175 if (endofstream)
4176 break;
4177
4178 /*
4179 * Wait for more data or latch. If we have unflushed transactions,
4180 * wake up after WalWriterDelay to see if they've been flushed yet (in
4181 * which case we should send a feedback message). Otherwise, there's
4182 * no particular urgency about waking up unless we get data or a
4183 * signal.
4184 */
4186 wait_time = WalWriterDelay;
4187 else
4188 wait_time = NAPTIME_PER_CYCLE;
4189
4190 /*
4191 * Ensure to wake up when it's possible to advance the non-removable
4192 * transaction ID, or when the retention duration may have exceeded
4193 * max_retention_duration.
4194 */
4196 {
4197 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4198 rdt_data.xid_advance_interval)
4199 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4200 else if (MySubscription->maxretention > 0)
4201 wait_time = Min(wait_time, MySubscription->maxretention);
4202 }
4203
4207 fd, wait_time,
4208 WAIT_EVENT_LOGICAL_APPLY_MAIN);
4209
4210 if (rc & WL_LATCH_SET)
4211 {
4214 }
4215
4217 {
4218 ConfigReloadPending = false;
4220 }
4221
4222 if (rc & WL_TIMEOUT)
4223 {
4224 /*
4225 * We didn't receive anything new. If we haven't heard anything
4226 * from the server for more than wal_receiver_timeout / 2, ping
4227 * the server. Also, if it's been longer than
4228 * wal_receiver_status_interval since the last update we sent,
4229 * send a status update to the primary anyway, to report any
4230 * progress in applying WAL.
4231 */
4232 bool requestReply = false;
4233
4234 /*
4235 * Check if time since last receive from primary has reached the
4236 * configured limit.
4237 */
4238 if (wal_receiver_timeout > 0)
4239 {
4241 TimestampTz timeout;
4242
4243 timeout =
4244 TimestampTzPlusMilliseconds(last_recv_timestamp,
4246
4247 if (now >= timeout)
4248 ereport(ERROR,
4249 (errcode(ERRCODE_CONNECTION_FAILURE),
4250 errmsg("terminating logical replication worker due to timeout")));
4251
4252 /* Check to see if it's time for a ping. */
4253 if (!ping_sent)
4254 {
4255 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4256 (wal_receiver_timeout / 2));
4257 if (now >= timeout)
4258 {
4259 requestReply = true;
4260 ping_sent = true;
4261 }
4262 }
4263 }
4264
4265 send_feedback(last_received, requestReply, requestReply);
4266
4267 maybe_advance_nonremovable_xid(&rdt_data, false);
4268
4269 /*
4270 * Force reporting to ensure long idle periods don't lead to
4271 * arbitrarily delayed stats. Stats can only be reported outside
4272 * of (implicit or explicit) transactions. That shouldn't lead to
4273 * stats being delayed for long, because transactions are either
4274 * sent as a whole on commit or streamed. Streamed transactions
4275 * are spilled to disk and applied on commit.
4276 */
4277 if (!IsTransactionState())
4278 pgstat_report_stat(true);
4279 }
4280 }
4281
4282 /* Pop the error context stack */
4283 error_context_stack = errcallback.previous;
4285
4286 /* All done */
4288}
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3965
#define NAPTIME_PER_CYCLE
Definition: worker.c:299
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:469
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:4297
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4387
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
void apply_error_callback(void *arg)
Definition: worker.c:6159
static MemoryContext LogicalStreamingContext
Definition: worker.c:475
uint64_t uint64
Definition: c.h:542
ErrorContextCallback * error_context_stack
Definition: elog.c:95
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:223
void ResetLatch(Latch *latch)
Definition: latch.c:374
static char * buf
Definition: pg_test_fsync.c:72
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
char * c
#define PqReplMsg_WALData
Definition: protocol.h:77
#define PqReplMsg_Keepalive
Definition: protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition: protocol.h:76
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
FullTransactionId remote_oldestxid
Definition: worker.c:412
FullTransactionId remote_nextxid
Definition: worker.c:419
XLogRecPtr remote_lsn
Definition: worker.c:404
TimestampTz reply_time
Definition: worker.c:421
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition: transam.h:81
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
int wal_receiver_timeout
Definition: walreceiver.c:89
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
int WalWriterDelay
Definition: walwriter.c:70
uint32 TimeLineID
Definition: xlogdefs.h:63

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), elog, ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), FullTransactionIdFromU64(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), RetainDeadTuplesData::last_recv_time, len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, Subscription::maxretention, maybe_advance_nonremovable_xid(), maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), Min, MyLatch, MySubscription, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), RetainDeadTuplesData::phase, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_PrimaryStatusUpdate, PqReplMsg_WALData, ErrorContextCallback::previous, ProcessConfigFile(), ProcessSyncingRelations(), RDT_GET_CANDIDATE_XID, RetainDeadTuplesData::remote_lsn, RetainDeadTuplesData::remote_nextxid, RetainDeadTuplesData::remote_oldestxid, RetainDeadTuplesData::reply_time, ResetLatch(), Subscription::retentionactive, send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, RetainDeadTuplesData::xid_advance_interval, and XLogRecPtrIsValid.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_advance_nonremovable_xid()

static void maybe_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4387 of file worker.c.

4389{
4390 if (!can_advance_nonremovable_xid(rdt_data))
4391 return;
4392
4393 process_rdt_phase_transition(rdt_data, status_received);
4394}
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4401

References can_advance_nonremovable_xid(), and process_rdt_phase_transition().

Referenced by LogicalRepApplyLoop().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 5038 of file worker.c.

5039{
5040 MemoryContext oldctx;
5042 bool started_tx = false;
5043
5044 /* When cache state is valid there is nothing to do here. */
5046 return;
5047
5048 /* This function might be called inside or outside of transaction. */
5049 if (!IsTransactionState())
5050 {
5052 started_tx = true;
5053 }
5054
5055 /* Ensure allocations in permanent context. */
5057
5059
5060 /*
5061 * Exit if the subscription was removed. This normally should not happen
5062 * as the worker gets killed during DROP SUBSCRIPTION.
5063 */
5064 if (!newsub)
5065 {
5066 ereport(LOG,
5067 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5068 MySubscription->name)));
5069
5070 /* Ensure we remove no-longer-useful entry for worker's start time */
5073
5074 proc_exit(0);
5075 }
5076
5077 /* Exit if the subscription was disabled. */
5078 if (!newsub->enabled)
5079 {
5080 ereport(LOG,
5081 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5082 MySubscription->name)));
5083
5085 }
5086
5087 /* !slotname should never happen when enabled is true. */
5088 Assert(newsub->slotname);
5089
5090 /* two-phase cannot be altered while the worker is running */
5091 Assert(newsub->twophasestate == MySubscription->twophasestate);
5092
5093 /*
5094 * Exit if any parameter that affects the remote connection was changed.
5095 * The launcher will start a new worker but note that the parallel apply
5096 * worker won't restart if the streaming option's value is changed from
5097 * 'parallel' to any other value or the server decides not to stream the
5098 * in-progress transaction.
5099 */
5100 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5101 strcmp(newsub->name, MySubscription->name) != 0 ||
5102 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5103 newsub->binary != MySubscription->binary ||
5104 newsub->stream != MySubscription->stream ||
5105 newsub->passwordrequired != MySubscription->passwordrequired ||
5106 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5107 newsub->owner != MySubscription->owner ||
5108 !equal(newsub->publications, MySubscription->publications))
5109 {
5111 ereport(LOG,
5112 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5113 MySubscription->name)));
5114 else
5115 ereport(LOG,
5116 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5117 MySubscription->name)));
5118
5120 }
5121
5122 /*
5123 * Exit if the subscription owner's superuser privileges have been
5124 * revoked.
5125 */
5126 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5127 {
5129 ereport(LOG,
5130 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5132 else
5133 ereport(LOG,
5134 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5136
5138 }
5139
5140 /* Check for other changes that should never happen too. */
5141 if (newsub->dbid != MySubscription->dbid)
5142 {
5143 elog(ERROR, "subscription %u changed unexpectedly",
5145 }
5146
5147 /* Clean old subscription info and switch to new one. */
5150
5151 MemoryContextSwitchTo(oldctx);
5152
5153 /* Change synchronous commit according to the user's wishes */
5154 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5156
5157 if (started_tx)
5159
5160 MySubscriptionValid = true;
5161}
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 6021 of file worker.c.

6022{
6026
6027 /*
6028 * Quick return if it's not requested to skip this transaction. This
6029 * function is called for every remote transaction and we assume that
6030 * skipping the transaction is not used often.
6031 */
6033 MySubscription->skiplsn != finish_lsn))
6034 return;
6035
6036 /* Start skipping all changes of this transaction */
6037 skip_xact_finish_lsn = finish_lsn;
6038
6039 ereport(LOG,
6040 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6042}
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:516

References Assert(), ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsValid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ process_rdt_phase_transition()

static void process_rdt_phase_transition ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4423 of file worker.c.

4425{
4426 switch (rdt_data->phase)
4427 {
4429 get_candidate_xid(rdt_data);
4430 break;
4432 request_publisher_status(rdt_data);
4433 break;
4435 wait_for_publisher_status(rdt_data, status_received);
4436 break;
4438 wait_for_local_flush(rdt_data);
4439 break;
4442 break;
4445 break;
4446 }
4447}
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4614
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4453
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4555
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4516
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4842
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4804

References get_candidate_xid(), RetainDeadTuplesData::phase, RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, request_publisher_status(), resume_conflict_info_retention(), stop_conflict_info_retention(), wait_for_local_flush(), and wait_for_publisher_status().

Referenced by get_candidate_xid(), maybe_advance_nonremovable_xid(), wait_for_local_flush(), and wait_for_publisher_status().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 641 of file worker.c.

643{
644 if (OidIsValid(relid))
645 {
646 /* Replication origin name for tablesync workers. */
647 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 }
649 else
650 {
651 /* Replication origin name for non-tablesync workers. */
652 snprintf(originname, szoriginname, "pg_%u", suboid);
653 }
654}

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), run_apply_worker(), and run_tablesync_worker().

◆ replorigin_reset()

static void replorigin_reset ( int  code,
Datum  arg 
)
static

◆ request_publisher_status()

static void request_publisher_status ( RetainDeadTuplesData rdt_data)
static

Definition at line 4516 of file worker.c.

4517{
4518 static StringInfo request_message = NULL;
4519
4520 if (!request_message)
4521 {
4523
4524 request_message = makeStringInfo();
4525 MemoryContextSwitchTo(oldctx);
4526 }
4527 else
4528 resetStringInfo(request_message);
4529
4530 /*
4531 * Send the current time to update the remote walsender's latest reply
4532 * message received time.
4533 */
4535 pq_sendint64(request_message, GetCurrentTimestamp());
4536
4537 elog(DEBUG2, "sending publisher status request message");
4538
4539 /* Send a request for the publisher status */
4541 request_message->data, request_message->len);
4542
4544
4545 /*
4546 * Skip calling maybe_advance_nonremovable_xid() since further transition
4547 * is possible only once we receive the publisher status message.
4548 */
4549}
#define DEBUG2
Definition: elog.h:29
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
#define PqReplMsg_PrimaryStatusRequest
Definition: protocol.h:83
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457

References ApplyContext, StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, LogRepWorkerWalRcvConn, makeStringInfo(), MemoryContextSwitchTo(), RetainDeadTuplesData::phase, pq_sendbyte(), pq_sendint64(), PqReplMsg_PrimaryStatusRequest, RDT_WAIT_FOR_PUBLISHER_STATUS, resetStringInfo(), and walrcv_send.

Referenced by process_rdt_phase_transition().

◆ reset_apply_error_context_info()

◆ reset_retention_data_fields()

◆ resume_conflict_info_retention()

static void resume_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4842 of file worker.c.

4843{
4844 /* We can't resume retention without updating retention status. */
4845 if (!update_retention_status(true))
4846 return;
4847
4848 ereport(LOG,
4849 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4852 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4853 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4854
4855 /*
4856 * Restart the worker to let the launcher initialize
4857 * oldest_nonremovable_xid at startup.
4858 *
4859 * While it's technically possible to derive this value on-the-fly using
4860 * the conflict detection slot's xmin, doing so risks a race condition:
4861 * the launcher might clean slot.xmin just after retention resumes. This
4862 * would make oldest_nonremovable_xid unreliable, especially during xid
4863 * wraparound.
4864 *
4865 * Although this can be prevented by introducing heavy weight locking, the
4866 * complexity it will bring doesn't seem worthwhile given how rarely
4867 * retention is resumed.
4868 */
4870}
static bool update_retention_status(bool active)
Definition: worker.c:4882

References apply_worker_exit(), ereport, errdetail(), errmsg(), LOG, Subscription::maxretention, MySubscription, Subscription::name, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ run_apply_worker()

static void run_apply_worker ( void  )
static

Definition at line 5624 of file worker.c.

5625{
5626 char originname[NAMEDATALEN];
5627 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5628 char *slotname = NULL;
5630 RepOriginId originid;
5631 TimeLineID startpointTLI;
5632 char *err;
5633 bool must_use_password;
5634
5635 slotname = MySubscription->slotname;
5636
5637 /*
5638 * This shouldn't happen if the subscription is enabled, but guard against
5639 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5640 * slot is NULL.)
5641 */
5642 if (!slotname)
5643 ereport(ERROR,
5644 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5645 errmsg("subscription has no replication slot set")));
5646
5647 /* Setup replication origin tracking. */
5649 originname, sizeof(originname));
5651 originid = replorigin_by_name(originname, true);
5652 if (!OidIsValid(originid))
5653 originid = replorigin_create(originname);
5654 replorigin_session_setup(originid, 0);
5655 replorigin_session_origin = originid;
5656 origin_startpos = replorigin_session_get_progress(false);
5658
5659 /* Is the use of a password mandatory? */
5660 must_use_password = MySubscription->passwordrequired &&
5662
5664 true, must_use_password,
5666
5667 if (LogRepWorkerWalRcvConn == NULL)
5668 ereport(ERROR,
5669 (errcode(ERRCODE_CONNECTION_FAILURE),
5670 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5671 MySubscription->name, err)));
5672
5673 /*
5674 * We don't really use the output identify_system for anything but it does
5675 * some initializations on the upstream so let's still call it.
5676 */
5677 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5678
5680
5681 set_stream_options(&options, slotname, &origin_startpos);
5682
5683 /*
5684 * Even when the two_phase mode is requested by the user, it remains as
5685 * the tri-state PENDING until all tablesyncs have reached READY state.
5686 * Only then, can it become ENABLED.
5687 *
5688 * Note: If the subscription has no tables then leave the state as
5689 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5690 * work.
5691 */
5692 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5694 {
5695 /* Start streaming with two_phase enabled */
5696 options.proto.logical.twophase = true;
5698
5700
5701 /*
5702 * Updating pg_subscription might involve TOAST table access, so
5703 * ensure we have a valid snapshot.
5704 */
5706
5707 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5708 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5711 }
5712 else
5713 {
5715 }
5716
5718 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5720 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5721 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5722 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5723 "?")));
5724
5725 /* Run the main loop. */
5726 start_apply(origin_startpos);
5727}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5514
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5583
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6301
void err(int eval, const char *fmt,...)
Definition: err.c:43
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1119
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1272
#define NAMEDATALEN
static char ** options
bool AllTablesyncsReady(void)
Definition: tablesync.c:1600
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1651
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
uint16 RepOriginId
Definition: xlogdefs.h:69

References AllTablesyncsReady(), CommitTransactionCommand(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, GetTransactionSnapshot(), InvalidOid, InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::ownersuperuser, Subscription::passwordrequired, PopActiveSnapshot(), PushActiveSnapshot(), ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), set_apply_error_context_origin(), set_stream_options(), Subscription::slotname, start_apply(), StartTransactionCommand(), Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by ApplyWorkerMain().

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 4297 of file worker.c.

4298{
4299 static StringInfo reply_message = NULL;
4300 static TimestampTz send_time = 0;
4301
4302 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4303 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4304
4305 XLogRecPtr writepos;
4306 XLogRecPtr flushpos;
4308 bool have_pending_txes;
4309
4310 /*
4311 * If the user doesn't want status to be reported to the publisher, be
4312 * sure to exit before doing anything at all.
4313 */
4314 if (!force && wal_receiver_status_interval <= 0)
4315 return;
4316
4317 /* It's legal to not pass a recvpos */
4318 if (recvpos < last_recvpos)
4319 recvpos = last_recvpos;
4320
4321 get_flush_position(&writepos, &flushpos, &have_pending_txes);
4322
4323 /*
4324 * No outstanding transactions to flush, we can report the latest received
4325 * position. This is important for synchronous replication.
4326 */
4327 if (!have_pending_txes)
4328 flushpos = writepos = recvpos;
4329
4330 if (writepos < last_writepos)
4331 writepos = last_writepos;
4332
4333 if (flushpos < last_flushpos)
4334 flushpos = last_flushpos;
4335
4337
4338 /* if we've already reported everything we're good */
4339 if (!force &&
4340 writepos == last_writepos &&
4341 flushpos == last_flushpos &&
4342 !TimestampDifferenceExceeds(send_time, now,
4344 return;
4345 send_time = now;
4346
4347 if (!reply_message)
4348 {
4350
4352 MemoryContextSwitchTo(oldctx);
4353 }
4354 else
4356
4358 pq_sendint64(reply_message, recvpos); /* write */
4359 pq_sendint64(reply_message, flushpos); /* flush */
4360 pq_sendint64(reply_message, writepos); /* apply */
4361 pq_sendint64(reply_message, now); /* sendTime */
4362 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4363
4364 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4365 force,
4366 LSN_FORMAT_ARGS(recvpos),
4367 LSN_FORMAT_ARGS(writepos),
4368 LSN_FORMAT_ARGS(flushpos));
4369
4372
4373 if (recvpos > last_recvpos)
4374 last_recvpos = recvpos;
4375 if (writepos > last_writepos)
4376 last_writepos = writepos;
4377 if (flushpos > last_flushpos)
4378 last_flushpos = flushpos;
4379}
static XLogRecPtr last_flushpos
Definition: worker.c:527
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3895
#define PqReplMsg_StandbyStatusUpdate
Definition: protocol.h:84
static StringInfoData reply_message
Definition: walreceiver.c:132

References ApplyContext, StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, last_flushpos, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 6301 of file worker.c.

6302{
6304 originname);
6305}
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_xact()

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 5514 of file worker.c.

5517{
5518 int server_version;
5519
5520 options->logical = true;
5521 options->startpoint = *origin_startpos;
5522 options->slotname = slotname;
5523
5525 options->proto.logical.proto_version =
5530
5531 options->proto.logical.publication_names = MySubscription->publications;
5532 options->proto.logical.binary = MySubscription->binary;
5533
5534 /*
5535 * Assign the appropriate option value for streaming option according to
5536 * the 'streaming' mode and the publisher's ability to support that mode.
5537 */
5538 if (server_version >= 160000 &&
5539 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5540 {
5541 options->proto.logical.streaming_str = "parallel";
5543 }
5544 else if (server_version >= 140000 &&
5545 MySubscription->stream != LOGICALREP_STREAM_OFF)
5546 {
5547 options->proto.logical.streaming_str = "on";
5549 }
5550 else
5551 {
5552 options->proto.logical.streaming_str = NULL;
5554 }
5555
5556 options->proto.logical.twophase = false;
5557 options->proto.logical.origin = pstrdup(MySubscription->origin);
5558}
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static int server_version
Definition: pg_dumpall.c:109
#define walrcv_server_version(conn)
Definition: walreceiver.h:447

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 5869 of file worker.c.

5870{
5871 /* Attach to slot */
5872 logicalrep_worker_attach(worker_slot);
5873
5875
5876 /* Setup signal handling */
5878 pqsignal(SIGTERM, die);
5880
5881 /*
5882 * We don't currently need any ResourceOwner in a walreceiver process, but
5883 * if we did, we could call CreateAuxProcessResourceOwner here.
5884 */
5885
5886 /* Initialise stats to a sanish value */
5889
5890 /* Load the libpq-specific functions */
5891 load_file("libpqwalreceiver", false);
5892
5894
5895 /*
5896 * Register a callback to reset the origin state before aborting any
5897 * pending transaction during shutdown (see ShutdownPostgres()). This will
5898 * avoid origin advancement for an in-complete transaction which could
5899 * otherwise lead to its loss as such a transaction won't be sent by the
5900 * server again.
5901 *
5902 * Note that even a LOG or DEBUG statement placed after setting the origin
5903 * state may process a shutdown signal before committing the current apply
5904 * operation. So, it is important to register such a callback here.
5905 */
5907
5908 /* Connect to the origin and start the replication. */
5909 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5911
5912 /*
5913 * Setup callback for syscache so that we know when something changes in
5914 * the subscription relation state.
5915 */
5916 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5918 (Datum) 0);
5919}
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:5858
void InitializeLogRepWorker(void)
Definition: worker.c:5737
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:930
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void logicalrep_worker_attach(int slot)
Definition: launcher.c:757
#define die(msg)
#define pqsignal
Definition: port.h:552
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
Definition: syncutils.c:101
#define SIGHUP
Definition: win32_port.h:158

References am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog, GetCurrentTimestamp(), InitializeLogRepWorker(), InvalidateSyncingRelStates(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal, replorigin_reset(), LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 681 of file worker.c.

682{
683 switch (MyLogicalRepWorker->type)
684 {
686 return MyLogicalRepWorker->relid == rel->localreloid;
687
689 /* We don't synchronize rel's that are in unknown state. */
690 if (rel->state != SUBREL_STATE_READY &&
691 rel->state != SUBREL_STATE_UNKNOWN)
693 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
694 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
696 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
697
698 return rel->state == SUBREL_STATE_READY;
699
700 case WORKERTYPE_APPLY:
701 return (rel->state == SUBREL_STATE_READY ||
702 (rel->state == SUBREL_STATE_SYNCDONE &&
703 rel->statelsn <= remote_final_lsn));
704
706 /* Should never happen. */
707 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
708 break;
709
711 /* Should never happen. */
712 elog(ERROR, "Unknown worker type");
713 }
714
715 return false; /* dummy for compiler */
716}
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY

References elog, ereport, errcode(), errdetail(), errmsg(), ERROR, LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ should_stop_conflict_info_retention()

static bool should_stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4769 of file worker.c.

4770{
4772
4775 rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4776
4778 return false;
4779
4780 /*
4781 * Use last_recv_time when applying changes in the loop to avoid
4782 * unnecessary system time retrieval. If last_recv_time is not available,
4783 * obtain the current timestamp.
4784 */
4785 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4786
4787 /*
4788 * Return early if the wait time has not exceeded the configured maximum
4789 * (max_retention_duration). Time spent waiting for table synchronization
4790 * is excluded from this calculation, as it occurs infrequently.
4791 */
4794 rdt_data->table_sync_wait_time))
4795 return false;
4796
4797 return true;
4798}

References Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, GetCurrentTimestamp(), RetainDeadTuplesData::last_recv_time, Subscription::maxretention, MySubscription, now(), RetainDeadTuplesData::phase, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, RetainDeadTuplesData::table_sync_wait_time, TimestampDifferenceExceeds(), and TransactionIdIsValid.

Referenced by wait_for_local_flush(), and wait_for_publisher_status().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 959 of file worker.c.

961{
963 int num_phys_attrs = desc->natts;
964 int i;
965 int attnum,
966 num_defaults = 0;
967 int *defmap;
968 ExprState **defexprs;
969 ExprContext *econtext;
970
971 econtext = GetPerTupleExprContext(estate);
972
973 /* We got all the data via replication, no need to evaluate anything. */
974 if (num_phys_attrs == rel->remoterel.natts)
975 return;
976
977 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
978 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
979
980 Assert(rel->attrmap->maplen == num_phys_attrs);
981 for (attnum = 0; attnum < num_phys_attrs; attnum++)
982 {
984 Expr *defexpr;
985
986 if (cattr->attisdropped || cattr->attgenerated)
987 continue;
988
989 if (rel->attrmap->attnums[attnum] >= 0)
990 continue;
991
992 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
993
994 if (defexpr != NULL)
995 {
996 /* Run the expression through planner */
997 defexpr = expression_planner(defexpr);
998
999 /* Initialize executable expression in copycontext */
1000 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1001 defmap[num_defaults] = attnum;
1002 num_defaults++;
1003 }
1004 }
1005
1006 for (i = 0; i < num_defaults; i++)
1007 slot->tts_values[defmap[i]] =
1008 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1009}
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:143
#define GetPerTupleExprContext(estate)
Definition: executor.h:656
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:393
int16 attnum
Definition: pg_attribute.h:74
Expr * expression_planner(Expr *expr)
Definition: planner.c:6763
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool attgenerated
Definition: tupdesc.h:78
bool * tts_isnull
Definition: tuptable.h:126
Datum * tts_values
Definition: tuptable.h:124

References Assert(), CompactAttribute::attgenerated, CompactAttribute::attisdropped, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescCompactAttr().

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1118 of file worker.c.

1121{
1122 int natts = slot->tts_tupleDescriptor->natts;
1123 int i;
1124
1125 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1126 ExecClearTuple(slot);
1127
1128 /*
1129 * Copy all the column data from srcslot, so that we'll have valid values
1130 * for unreplaced columns.
1131 */
1132 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1133 slot_getallattrs(srcslot);
1134 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1135 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1136
1137 /* Call the "in" function for each replaced attribute */
1138 Assert(natts == rel->attrmap->maplen);
1139 for (i = 0; i < natts; i++)
1140 {
1142 int remoteattnum = rel->attrmap->attnums[i];
1143
1144 if (remoteattnum < 0)
1145 continue;
1146
1147 Assert(remoteattnum < tupleData->ncols);
1148
1149 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1150 {
1151 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1152
1153 /* Set attnum for error callback */
1155
1156 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1157 {
1158 Oid typinput;
1159 Oid typioparam;
1160
1161 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1162 slot->tts_values[i] =
1163 OidInputFunctionCall(typinput, colvalue->data,
1164 typioparam, att->atttypmod);
1165 slot->tts_isnull[i] = false;
1166 }
1167 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1168 {
1169 Oid typreceive;
1170 Oid typioparam;
1171
1172 /*
1173 * In some code paths we may be asked to re-parse the same
1174 * tuple data. Reset the StringInfo's cursor so that works.
1175 */
1176 colvalue->cursor = 0;
1177
1178 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1179 slot->tts_values[i] =
1180 OidReceiveFunctionCall(typreceive, colvalue,
1181 typioparam, att->atttypmod);
1182
1183 /* Trouble if it didn't eat the whole buffer */
1184 if (colvalue->cursor != colvalue->len)
1185 ereport(ERROR,
1186 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1187 errmsg("incorrect binary data format in logical replication column %d",
1188 remoteattnum + 1)));
1189 slot->tts_isnull[i] = false;
1190 }
1191 else
1192 {
1193 /* must be LOGICALREP_COLUMN_NULL */
1194 slot->tts_values[i] = (Datum) 0;
1195 slot->tts_isnull[i] = true;
1196 }
1197
1198 /* Reset attnum for error callback */
1200 }
1201 }
1202
1203 /* And finally, declare that "slot" contains a valid virtual tuple */
1205}
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:3041
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:3107
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
StringInfoData * colvalues
Definition: logicalproto.h:87
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1017 of file worker.c.

1019{
1020 int natts = slot->tts_tupleDescriptor->natts;
1021 int i;
1022
1023 ExecClearTuple(slot);
1024
1025 /* Call the "in" function for each non-dropped, non-null attribute */
1026 Assert(natts == rel->attrmap->maplen);
1027 for (i = 0; i < natts; i++)
1028 {
1030 int remoteattnum = rel->attrmap->attnums[i];
1031
1032 if (!att->attisdropped && remoteattnum >= 0)
1033 {
1034 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1035
1036 Assert(remoteattnum < tupleData->ncols);
1037
1038 /* Set attnum for error callback */
1040
1041 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1042 {
1043 Oid typinput;
1044 Oid typioparam;
1045
1046 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1047 slot->tts_values[i] =
1048 OidInputFunctionCall(typinput, colvalue->data,
1049 typioparam, att->atttypmod);
1050 slot->tts_isnull[i] = false;
1051 }
1052 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1053 {
1054 Oid typreceive;
1055 Oid typioparam;
1056
1057 /*
1058 * In some code paths we may be asked to re-parse the same
1059 * tuple data. Reset the StringInfo's cursor so that works.
1060 */
1061 colvalue->cursor = 0;
1062
1063 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1064 slot->tts_values[i] =
1065 OidReceiveFunctionCall(typreceive, colvalue,
1066 typioparam, att->atttypmod);
1067
1068 /* Trouble if it didn't eat the whole buffer */
1069 if (colvalue->cursor != colvalue->len)
1070 ereport(ERROR,
1071 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1072 errmsg("incorrect binary data format in logical replication column %d",
1073 remoteattnum + 1)));
1074 slot->tts_isnull[i] = false;
1075 }
1076 else
1077 {
1078 /*
1079 * NULL value from remote. (We don't expect to see
1080 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1081 * NULL.)
1082 */
1083 slot->tts_values[i] = (Datum) 0;
1084 slot->tts_isnull[i] = true;
1085 }
1086
1087 /* Reset attnum for error callback */
1089 }
1090 else
1091 {
1092 /*
1093 * We assign NULL to dropped attributes and missing values
1094 * (missing values should be later filled using
1095 * slot_fill_defaults).
1096 */
1097 slot->tts_values[i] = (Datum) 0;
1098 slot->tts_isnull[i] = true;
1099 }
1100 }
1101
1103}

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_tuple_routing(), apply_handle_update(), and apply_handle_update_internal().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 5583 of file worker.c.

5584{
5585 PG_TRY();
5586 {
5587 LogicalRepApplyLoop(origin_startpos);
5588 }
5589 PG_CATCH();
5590 {
5591 /*
5592 * Reset the origin state to prevent the advancement of origin
5593 * progress if we fail to apply. Otherwise, this will result in
5594 * transaction loss as that transaction won't be sent again by the
5595 * server.
5596 */
5597 replorigin_reset(0, (Datum) 0);
5598
5601 else
5602 {
5603 /*
5604 * Report the worker failed while applying changes. Abort the
5605 * current transaction so that the stats message is sent in an
5606 * idle state.
5607 */
5611
5612 PG_RE_THROW();
5613 }
5614 }
5615 PG_END_TRY();
5616}
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3981
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382

References AbortOutOfAnyTransaction(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MyLogicalRepWorker, MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), replorigin_reset(), and LogicalRepWorker::type.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ stop_conflict_info_retention()

static void stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4804 of file worker.c.

4805{
4806 /* Stop retention if not yet */
4808 {
4809 /*
4810 * If the retention status cannot be updated (e.g., due to active
4811 * transaction), skip further processing to avoid inconsistent
4812 * retention behavior.
4813 */
4814 if (!update_retention_status(false))
4815 return;
4816
4820
4821 ereport(LOG,
4822 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4824 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4825 }
4826
4828
4829 /*
4830 * If retention has been stopped, reset to the initial phase to retry
4831 * resuming retention. This reset is required to recalculate the current
4832 * wait time and resume retention if the time falls within
4833 * max_retention_duration.
4834 */
4836}
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4921

References Assert(), ereport, errdetail(), errmsg(), InvalidTransactionId, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::relmutex, reset_retention_data_fields(), Subscription::retentionactive, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 6048 of file worker.c.

6049{
6050 if (!is_skipping_changes())
6051 return;
6052
6053 ereport(LOG,
6054 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6056
6057 /* Stop skipping changes */
6059}

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3939 of file worker.c.

3940{
3941 FlushPosition *flushpos;
3942
3943 /*
3944 * Skip for parallel apply workers, because the lsn_mapping is maintained
3945 * by the leader apply worker.
3946 */
3948 return;
3949
3950 /* Need to do this in permanent context */
3952
3953 /* Track commit lsn */
3954 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3955 flushpos->local_end = local_lsn;
3956 flushpos->remote_end = remote_lsn;
3957
3958 dlist_push_tail(&lsn_mapping, &flushpos->node);
3960}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:303

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_abort_internal()

static void stream_abort_internal ( TransactionId  xid,
TransactionId  subxid 
)
static

Definition at line 1988 of file worker.c.

1989{
1990 /*
1991 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1992 * just delete the files with serialized info.
1993 */
1994 if (xid == subxid)
1996 else
1997 {
1998 /*
1999 * OK, so it's a subxact. We need to read the subxact file for the
2000 * toplevel transaction, determine the offset tracked for the subxact,
2001 * and truncate the file with changes. We also remove the subxacts
2002 * with higher offsets (or rather higher XIDs).
2003 *
2004 * We intentionally scan the array from the tail, because we're likely
2005 * aborting a change for the most recent subtransactions.
2006 *
2007 * We can't use the binary search here as subxact XIDs won't
2008 * necessarily arrive in sorted order, consider the case where we have
2009 * released the savepoint for multiple subtransactions and then
2010 * performed rollback to savepoint for one of the earlier
2011 * sub-transaction.
2012 */
2013 int64 i;
2014 int64 subidx;
2015 BufFile *fd;
2016 bool found = false;
2017 char path[MAXPGPATH];
2018
2019 subidx = -1;
2022
2023 for (i = subxact_data.nsubxacts; i > 0; i--)
2024 {
2025 if (subxact_data.subxacts[i - 1].xid == subxid)
2026 {
2027 subidx = (i - 1);
2028 found = true;
2029 break;
2030 }
2031 }
2032
2033 /*
2034 * If it's an empty sub-transaction then we will not find the subxid
2035 * here so just cleanup the subxact info and return.
2036 */
2037 if (!found)
2038 {
2039 /* Cleanup the subxact info */
2043 return;
2044 }
2045
2046 /* open the changes file */
2049 O_RDWR, false);
2050
2051 /* OK, truncate the file at the right offset */
2053 subxact_data.subxacts[subidx].offset);
2055
2056 /* discard the subxacts added later */
2057 subxact_data.nsubxacts = subidx;
2058
2059 /* write the updated subxact list */
2061
2064 }
2065}
static void cleanup_subxact_info(void)
Definition: worker.c:5564
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:5182
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:5231
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
int64_t int64
Definition: c.h:538
off_t offset
Definition: worker.c:533
TransactionId xid
Definition: worker.c:531
int fileno
Definition: worker.c:532

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), fd(), SubXactInfo::fileno, i, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_handle_stream_abort().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 5381 of file worker.c.

5382{
5383 char path[MAXPGPATH];
5384
5385 /* Delete the changes file. */
5386 changes_filename(path, subid, xid);
5388
5389 /* Delete the subxact file, if it exists. */
5390 subxact_filename(path, subid, xid);
5392}
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5360
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_close_file()

static void stream_close_file ( void  )
static

◆ stream_open_and_write_change()

static void stream_open_and_write_change ( TransactionId  xid,
char  action,
StringInfo  s 
)
static

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 5405 of file worker.c.

5406{
5407 char path[MAXPGPATH];
5408 MemoryContext oldcxt;
5409
5410 Assert(OidIsValid(subid));
5412 Assert(stream_fd == NULL);
5413
5414
5415 changes_filename(path, subid, xid);
5416 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5417
5418 /*
5419 * Create/open the buffiles under the logical streaming context so that we
5420 * have those files until stream stop.
5421 */
5423
5424 /*
5425 * If this is the first streamed segment, create the changes file.
5426 * Otherwise, just open the file for writing, in append mode.
5427 */
5428 if (first_segment)
5430 path);
5431 else
5432 {
5433 /*
5434 * Open the file and seek to the end of the file because we always
5435 * append the changes file.
5436 */
5438 path, O_RDWR, false);
5439 BufFileSeek(stream_fd, 0, 0, SEEK_END);
5440 }
5441
5442 MemoryContextSwitchTo(oldcxt);
5443}
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267

References Assert(), BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by stream_start_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1687 of file worker.c.

1688{
1690
1691 /*
1692 * Initialize the worker's stream_fileset if we haven't yet. This will be
1693 * used for the entire duration of the worker so create it in a permanent
1694 * context. We create this on the very first streaming message from any
1695 * transaction and then use it for this and other streaming transactions.
1696 * Now, we could create a fileset at the start of the worker as well but
1697 * then we won't be sure that it will ever be used.
1698 */
1700 {
1701 MemoryContext oldctx;
1702
1704
1707
1708 MemoryContextSwitchTo(oldctx);
1709 }
1710
1711 /* Open the spool file for this transaction. */
1712 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1713
1714 /* If this is not the first segment, open existing subxact file. */
1715 if (!first_segment)
1717
1719}
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:5405
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1862 of file worker.c.

1863{
1864 /*
1865 * Serialize information about subxacts for the toplevel transaction, then
1866 * close the stream messages spool file.
1867 */
1870
1871 /* We must be in a valid transaction state */
1873
1874 /* Commit the per-stream transaction */
1876
1877 /* Reset per-stream context */
1879}

References Assert(), CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 5468 of file worker.c.

5469{
5470 int len;
5471
5472 Assert(stream_fd != NULL);
5473
5474 /* total on-disk size, including the action type character */
5475 len = (s->len - s->cursor) + sizeof(char);
5476
5477 /* first write the size */
5478 BufFileWrite(stream_fd, &len, sizeof(len));
5479
5480 /* then the action */
5481 BufFileWrite(stream_fd, &action, sizeof(action));
5482
5483 /* and finally the remaining part of the buffer (after the XID) */
5484 len = (s->len - s->cursor);
5485
5487}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676

References generate_unaccent_rules::action, Assert(), BufFileWrite(), StringInfoData::cursor, StringInfoData::data, StringInfoData::len, len, and stream_fd.

Referenced by apply_handle_stream_start(), apply_handle_stream_stop(), handle_streamed_transaction(), and stream_open_and_write_change().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 5167 of file worker.c.

5168{
5169 MySubscriptionValid = false;
5170}

References MySubscriptionValid.

Referenced by InitializeLogRepWorker().

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5360 of file worker.c.

5361{
5362 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5363}

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 5282 of file worker.c.

5283{
5284 SubXactInfo *subxacts = subxact_data.subxacts;
5285 int64 i;
5286
5287 /* We must have a valid top level stream xid and a stream fd. */
5289 Assert(stream_fd != NULL);
5290
5291 /*
5292 * If the XID matches the toplevel transaction, we don't want to add it.
5293 */
5294 if (stream_xid == xid)
5295 return;
5296
5297 /*
5298 * In most cases we're checking the same subxact as we've already seen in
5299 * the last call, so make sure to ignore it (this change comes later).
5300 */
5301 if (subxact_data.subxact_last == xid)
5302 return;
5303
5304 /* OK, remember we're processing this XID. */
5306
5307 /*
5308 * Check if the transaction is already present in the array of subxact. We
5309 * intentionally scan the array from the tail, because we're likely adding
5310 * a change for the most recent subtransactions.
5311 *
5312 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5313 * would allow us to use binary search here.
5314 */
5315 for (i = subxact_data.nsubxacts; i > 0; i--)
5316 {
5317 /* found, so we're done */
5318 if (subxacts[i - 1].xid == xid)
5319 return;
5320 }
5321
5322 /* This is a new subxact, so we need to add it to the array. */
5323 if (subxact_data.nsubxacts == 0)
5324 {
5325 MemoryContext oldctx;
5326
5328
5329 /*
5330 * Allocate this memory for subxacts in per-stream context, see
5331 * subxact_info_read.
5332 */
5334 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5335 MemoryContextSwitchTo(oldctx);
5336 }
5338 {
5340 subxacts = repalloc(subxacts,
5342 }
5343
5344 subxacts[subxact_data.nsubxacts].xid = xid;
5345
5346 /*
5347 * Get the current offset of the stream file and store it as offset of
5348 * this subxact.
5349 */
5351 &subxacts[subxact_data.nsubxacts].fileno,
5352 &subxacts[subxact_data.nsubxacts].offset);
5353
5355 subxact_data.subxacts = subxacts;
5356}

References Assert(), BufFileTell(), SubXactInfo::fileno, i, LogicalStreamingContext, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc(), repalloc(), stream_fd, stream_xid, subxact_data, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5231 of file worker.c.

5232{
5233 char path[MAXPGPATH];
5234 Size len;
5235 BufFile *fd;
5236 MemoryContext oldctx;
5237
5241
5242 /*
5243 * If the subxact file doesn't exist that means we don't have any subxact
5244 * info.
5245 */
5246 subxact_filename(path, subid, xid);
5248 true);
5249 if (fd == NULL)
5250 return;
5251
5252 /* read number of subxact items */
5254
5256
5257 /* we keep the maximum as a power of 2 */
5259
5260 /*
5261 * Allocate subxact information in the logical streaming context. We need
5262 * this information during the complete stream so that we can add the sub
5263 * transaction info to this. On stream stop we will flush this information
5264 * to the subxact file and reset the logical streaming context.
5265 */
5268 sizeof(SubXactInfo));
5269 MemoryContextSwitchTo(oldctx);
5270
5271 if (len > 0)
5273
5275}
struct SubXactInfo SubXactInfo
size_t Size
Definition: c.h:613
static uint32 pg_ceil_log2_32(uint32 num)
Definition: pg_bitutils.h:258

References Assert(), BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), fd(), len, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc(), pg_ceil_log2_32(), LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and stream_start_internal().

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5182 of file worker.c.

5183{
5184 char path[MAXPGPATH];
5185 Size len;
5186 BufFile *fd;
5187
5189
5190 /* construct the subxact filename */
5191 subxact_filename(path, subid, xid);
5192
5193 /* Delete the subxacts file, if exists. */
5194 if (subxact_data.nsubxacts == 0)
5195 {
5198
5199 return;
5200 }
5201
5202 /*
5203 * Create the subxact file if it not already created, otherwise open the
5204 * existing file.
5205 */
5207 true);
5208 if (fd == NULL)
5210
5212
5213 /* Write the subxact count and subxact info */
5216
5218
5219 /* free the memory allocated for subxact info */
5221}

References Assert(), BufFileClose(), BufFileCreateFileSet(), BufFileDeleteFileSet(), BufFileOpenFileSet(), BufFileWrite(), cleanup_subxact_info(), fd(), len, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by stream_abort_internal(), and stream_stop_internal().

◆ TargetPrivilegesCheck()

static void TargetPrivilegesCheck ( Relation  rel,
AclMode  mode 
)
static

Definition at line 2601 of file worker.c.

2602{
2603 Oid relid;
2604 AclResult aclresult;
2605
2606 relid = RelationGetRelid(rel);
2607 aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2608 if (aclresult != ACLCHECK_OK)
2609 aclcheck_error(aclresult,
2610 get_relkind_objtype(rel->rd_rel->relkind),
2611 get_rel_name(relid));
2612
2613 /*
2614 * We lack the infrastructure to honor RLS policies. It might be possible
2615 * to add such infrastructure here, but tablesync workers lack it, too, so
2616 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2617 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2618 * replicate subsequent INSERTs, so we forbid all commands the same.
2619 */
2620 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2621 ereport(ERROR,
2622 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2623 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2626}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
Oid GetUserId(void)
Definition: miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:988
ObjectType get_relkind_objtype(char relkind)
static PgChecksumMode mode
Definition: pg_checksums.c:56
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45

References aclcheck_error(), ACLCHECK_OK, check_enable_rls(), ereport, errcode(), errmsg(), ERROR, get_rel_name(), get_relkind_objtype(), GetUserId(), GetUserNameFromId(), InvalidOid, mode, pg_class_aclcheck(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, and RLS_ENABLED.

Referenced by apply_handle_delete_internal(), apply_handle_insert_internal(), apply_handle_truncate(), apply_handle_tuple_routing(), apply_handle_update_internal(), and FindReplTupleInLocalRel().

◆ update_retention_status()

static bool update_retention_status ( bool  active)
static

Definition at line 4882 of file worker.c.

4883{
4884 /*
4885 * Do not update the catalog during an active transaction. The transaction
4886 * may be started during change application, leading to a possible
4887 * rollback of catalog updates if the application fails subsequently.
4888 */
4889 if (IsTransactionState())
4890 return false;
4891
4893
4894 /*
4895 * Updating pg_subscription might involve TOAST table access, so ensure we
4896 * have a valid snapshot.
4897 */
4899
4900 /* Update pg_subscription.subretentionactive */
4902
4905
4906 /* Notify launcher to update the conflict slot */
4908
4910
4911 return true;
4912}
void ApplyLauncherWakeup(void)
Definition: launcher.c:1194
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)

References ApplyLauncherWakeup(), CommitTransactionCommand(), GetTransactionSnapshot(), IsTransactionState(), MySubscription, Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), Subscription::retentionactive, StartTransactionCommand(), and UpdateDeadTupleRetentionStatus().

Referenced by resume_conflict_info_retention(), and stop_conflict_info_retention().

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

◆ wait_for_local_flush()

static void wait_for_local_flush ( RetainDeadTuplesData rdt_data)
static

Definition at line 4614 of file worker.c.

4615{
4616 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4618
4619 /*
4620 * We expect the publisher and subscriber clocks to be in sync using time
4621 * sync service like NTP. Otherwise, we will advance this worker's
4622 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4623 * required to detect update_deleted reliably. This check primarily
4624 * addresses scenarios where the publisher's clock falls behind; if the
4625 * publisher's clock is ahead, subsequent transactions will naturally bear
4626 * later commit timestamps, conforming to the design outlined atop
4627 * worker.c.
4628 *
4629 * XXX Consider waiting for the publisher's clock to catch up with the
4630 * subscriber's before proceeding to the next phase.
4631 */
4633 rdt_data->candidate_xid_time, 0))
4634 ereport(ERROR,
4635 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4636 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4637
4638 /*
4639 * Do not attempt to advance the non-removable transaction ID when table
4640 * sync is in progress. During this time, changes from a single
4641 * transaction may be applied by multiple table sync workers corresponding
4642 * to the target tables. So, it's necessary for all table sync workers to
4643 * apply and flush the corresponding changes before advancing the
4644 * transaction ID, otherwise, dead tuples that are still needed for
4645 * conflict detection in table sync workers could be removed prematurely.
4646 * However, confirming the apply and flush progress across all table sync
4647 * workers is complex and not worth the effort, so we simply return if not
4648 * all tables are in the READY state.
4649 *
4650 * Advancing the transaction ID is necessary even when no tables are
4651 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4652 * While it might seem safe to skip all phases and directly assign
4653 * candidate_xid to oldest_nonremovable_xid during the
4654 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4655 * concurrently add tables to the subscription, the apply worker may not
4656 * process invalidations in time. Consequently,
4657 * HasSubscriptionTablesCached() might miss the new tables, leading to
4658 * premature advancement of oldest_nonremovable_xid.
4659 *
4660 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4661 * invalidations are guaranteed to be processed before applying changes
4662 * from newly added tables while waiting for the local flush to reach
4663 * remote_lsn.
4664 *
4665 * Additionally, even if we check for subscription tables during
4666 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4667 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4668 * subscription tables at this stage to prevent unnecessary tuple
4669 * retention.
4670 */
4672 {
4674
4675 now = rdt_data->last_recv_time
4676 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4677
4678 /*
4679 * Record the time spent waiting for table sync, it is needed for the
4680 * timeout check in should_stop_conflict_info_retention().
4681 */
4682 rdt_data->table_sync_wait_time =
4684
4685 return;
4686 }
4687
4688 /*
4689 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4690 * retaining conflict information for this worker.
4691 */
4693 {
4695 return;
4696 }
4697
4698 /*
4699 * Update and check the remote flush position if we are applying changes
4700 * in a loop. This is done at most once per WalWriterDelay to avoid
4701 * performing costly operations in get_flush_position() too frequently
4702 * during change application.
4703 */
4704 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4706 rdt_data->last_recv_time, WalWriterDelay))
4707 {
4708 XLogRecPtr writepos;
4709 XLogRecPtr flushpos;
4710 bool have_pending_txes;
4711
4712 /* Fetch the latest remote flush position */
4713 get_flush_position(&writepos, &flushpos, &have_pending_txes);
4714
4715 if (flushpos > last_flushpos)
4716 last_flushpos = flushpos;
4717
4718 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4719 }
4720
4721 /* Return to wait for the changes to be applied */
4722 if (last_flushpos < rdt_data->remote_lsn)
4723 return;
4724
4725 /*
4726 * Reaching this point implies should_stop_conflict_info_retention()
4727 * returned false earlier, meaning that the most recent duration for
4728 * advancing the non-removable transaction ID is within the
4729 * max_retention_duration or max_retention_duration is set to 0.
4730 *
4731 * Therefore, if conflict info retention was previously stopped due to a
4732 * timeout, it is now safe to resume retention.
4733 */
4735 {
4737 return;
4738 }
4739
4740 /*
4741 * Reaching here means the remote WAL position has been received, and all
4742 * transactions up to that position on the publisher have been applied and
4743 * flushed locally. So, we can advance the non-removable transaction ID.
4744 */
4748
4749 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4750 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4751 rdt_data->candidate_xid);
4752
4753 /* Notify launcher to update the xmin of the conflict slot */
4755
4757
4758 /* process the next phase */
4759 process_rdt_phase_transition(rdt_data, false);
4760}
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4769
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
TimestampTz flushpos_update_time
Definition: worker.c:432
bool HasSubscriptionTablesCached(void)
Definition: tablesync.c:1630

References AllTablesyncsReady(), ApplyLauncherWakeup(), Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, DEBUG2, elog, ereport, errdetail_internal(), errmsg_internal(), ERROR, RetainDeadTuplesData::flushpos_update_time, get_flush_position(), GetCurrentTimestamp(), HasSubscriptionTablesCached(), last_flushpos, RetainDeadTuplesData::last_recv_time, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, now(), LogicalRepWorker::oldest_nonremovable_xid, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, LogicalRepWorker::relmutex, RetainDeadTuplesData::remote_lsn, RetainDeadTuplesData::reply_time, reset_retention_data_fields(), Subscription::retentionactive, should_stop_conflict_info_retention(), SpinLockAcquire, SpinLockRelease, RetainDeadTuplesData::table_sync_wait_time, TimestampDifferenceExceeds(), TimestampDifferenceMilliseconds(), TransactionIdIsValid, WalWriterDelay, and XLogRecPtrIsValid.

Referenced by process_rdt_phase_transition().

◆ wait_for_publisher_status()

static void wait_for_publisher_status ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4555 of file worker.c.

4557{
4558 /*
4559 * Return if we have requested but not yet received the publisher status.
4560 */
4561 if (!status_received)
4562 return;
4563
4564 /*
4565 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4566 * retaining conflict information for this worker.
4567 */
4569 {
4571 return;
4572 }
4573
4575 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4576
4577 /*
4578 * Check if all remote concurrent transactions that were active at the
4579 * first status request have now completed. If completed, proceed to the
4580 * next phase; otherwise, continue checking the publisher status until
4581 * these transactions finish.
4582 *
4583 * It's possible that transactions in the commit phase during the last
4584 * cycle have now finished committing, but remote_oldestxid remains older
4585 * than remote_wait_for. This can happen if some old transaction came in
4586 * the commit phase when we requested status in this cycle. We do not
4587 * handle this case explicitly as it's rare and the benefit doesn't
4588 * justify the required complexity. Tracking would require either caching
4589 * all xids at the publisher or sending them to subscribers. The condition
4590 * will resolve naturally once the remaining transactions are finished.
4591 *
4592 * Directly advancing the non-removable transaction ID is possible if
4593 * there are no activities on the publisher since the last advancement
4594 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4595 * and last_remote_lsn, within the structure for comparison with the
4596 * current cycle's values. Considering the minimal cost of continuing in
4597 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4598 * advance the transaction ID here.
4599 */
4601 rdt_data->remote_oldestxid))
4602 rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4603 else
4605
4606 /* process the next phase */
4607 process_rdt_phase_transition(rdt_data, false);
4608}
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition: transam.h:52
#define FullTransactionIdIsValid(x)
Definition: transam.h:55

References FullTransactionIdIsValid, FullTransactionIdPrecedesOrEquals, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, RetainDeadTuplesData::remote_nextxid, RetainDeadTuplesData::remote_oldestxid, RetainDeadTuplesData::remote_wait_for, and should_stop_conflict_info_retention().

Referenced by process_rdt_phase_transition().

Variable Documentation

◆ apply_error_callback_arg

ApplyErrorCallbackArg apply_error_callback_arg
static
Initial value:
=
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}

Definition at line 459 of file worker.c.

Referenced by apply_dispatch(), apply_error_callback(), apply_handle_delete(), apply_handle_insert(), apply_handle_update(), reset_apply_error_context_info(), set_apply_error_context_origin(), set_apply_error_context_xact(), slot_modify_data(), and slot_store_data().

◆ apply_error_context_stack

ErrorContextCallback* apply_error_context_stack = NULL

Definition at line 469 of file worker.c.

Referenced by LogicalRepApplyLoop(), and ProcessParallelApplyMessage().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ in_streamed_transaction

◆ InitializingApplyWorker

bool InitializingApplyWorker = false

Definition at line 499 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ last_flushpos

XLogRecPtr last_flushpos = InvalidXLogRecPtr
static

Definition at line 527 of file worker.c.

Referenced by send_feedback(), and wait_for_local_flush().

◆ LogicalStreamingContext

MemoryContext LogicalStreamingContext = NULL
static

◆ LogRepWorkerWalRcvConn

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 308 of file worker.c.

Referenced by get_flush_position(), LogicalRepApplyLoop(), and store_flush_position().

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false
static

◆ on_commit_wakeup_workers_subids

List* on_commit_wakeup_workers_subids = NIL
static

Definition at line 482 of file worker.c.

Referenced by AtEOXact_LogicalRepWorkers(), and LogicalRepWorkersWakeupAtCommit().

◆ parallel_stream_nchanges

uint32 parallel_stream_nchanges = 0
static

◆ remote_final_lsn

◆ skip_xact_finish_lsn

XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
static

Definition at line 516 of file worker.c.

Referenced by maybe_start_skipping_changes(), and stop_skipping_changes().

◆ stream_fd

◆ stream_xid

◆ subxact_data