PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  RetainDeadTuplesData
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define MIN_XID_ADVANCE_INTERVAL   100
 
#define MAX_XID_ADVANCE_INTERVAL   180000
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct RetainDeadTuplesData RetainDeadTuplesData
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 
enum  RetainDeadTuplesPhase {
  RDT_GET_CANDIDATE_XID , RDT_REQUEST_PUBLISHER_STATUS , RDT_WAIT_FOR_PUBLISHER_STATUS , RDT_WAIT_FOR_LOCAL_FLUSH ,
  RDT_STOP_CONFLICT_INFO_RETENTION , RDT_RESUME_CONFLICT_INFO_RETENTION
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void maybe_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data, bool status_received)
 
static bool can_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data)
 
static void process_rdt_phase_transition (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void get_candidate_xid (RetainDeadTuplesData *rdt_data)
 
static void request_publisher_status (RetainDeadTuplesData *rdt_data)
 
static void wait_for_publisher_status (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void wait_for_local_flush (RetainDeadTuplesData *rdt_data)
 
static bool should_stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void resume_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static bool update_retention_status (bool active)
 
static void reset_retention_data_fields (RetainDeadTuplesData *rdt_data)
 
static void adjust_xid_advance_interval (RetainDeadTuplesData *rdt_data, bool new_xid_found)
 
static void apply_worker_exit (void)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static bool FindDeletedTupleInLocalRel (Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
static void replorigin_reset (int code, Datum arg)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static bool IsIndexUsableForFindingDeletedTuple (Oid localindexoid, TransactionId conflict_detection_xmin)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker ()
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static XLogRecPtr last_flushpos = InvalidXLogRecPtr
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 517 of file worker.c.

◆ MAX_XID_ADVANCE_INTERVAL

#define MAX_XID_ADVANCE_INTERVAL   180000

Definition at line 456 of file worker.c.

◆ MIN_XID_ADVANCE_INTERVAL

#define MIN_XID_ADVANCE_INTERVAL   100

Definition at line 455 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 299 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ RetainDeadTuplesData

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ RetainDeadTuplesPhase

Enumerator
RDT_GET_CANDIDATE_XID 
RDT_REQUEST_PUBLISHER_STATUS 
RDT_WAIT_FOR_PUBLISHER_STATUS 
RDT_WAIT_FOR_LOCAL_FLUSH 
RDT_STOP_CONFLICT_INFO_RETENTION 
RDT_RESUME_CONFLICT_INFO_RETENTION 

Definition at line 387 of file worker.c.

388{
RetainDeadTuplesPhase
Definition: worker.c:388
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition: worker.c:391
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition: worker.c:394
@ RDT_GET_CANDIDATE_XID
Definition: worker.c:389
@ RDT_REQUEST_PUBLISHER_STATUS
Definition: worker.c:390
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition: worker.c:392
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition: worker.c:393

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 369 of file worker.c.

370{
371 /* The action for non-streaming transactions. */
373
374 /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:370
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:375
@ TRANS_PARALLEL_APPLY
Definition: worker.c:378
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:376
@ TRANS_LEADER_APPLY
Definition: worker.c:372
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:377

Function Documentation

◆ adjust_xid_advance_interval()

static void adjust_xid_advance_interval ( RetainDeadTuplesData rdt_data,
bool  new_xid_found 
)
static

Definition at line 4924 of file worker.c.

4925{
4926 if (rdt_data->xid_advance_interval && !new_xid_found)
4927 {
4928 int max_interval = wal_receiver_status_interval
4931
4932 /*
4933 * No new transaction ID has been assigned since the last check, so
4934 * double the interval, but not beyond the maximum allowable value.
4935 */
4936 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4937 max_interval);
4938 }
4939 else if (rdt_data->xid_advance_interval &&
4941 {
4942 /*
4943 * Retention has been stopped, so double the interval-capped at a
4944 * maximum of 3 minutes. The wal_receiver_status_interval is
4945 * intentionally not used as a upper bound, since the likelihood of
4946 * retention resuming is lower than that of general activity resuming.
4947 */
4948 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4950 }
4951 else
4952 {
4953 /*
4954 * A new transaction ID was found or the interval is not yet
4955 * initialized, so set the interval to the minimum value.
4956 */
4958 }
4959
4960 /*
4961 * Ensure the wait time remains within the maximum retention time limit
4962 * when retention is active.
4963 */
4965 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4967}
#define MAX_XID_ADVANCE_INTERVAL
Definition: worker.c:456
#define MIN_XID_ADVANCE_INTERVAL
Definition: worker.c:455
Subscription * MySubscription
Definition: worker.c:479
#define Min(x, y)
Definition: c.h:1003
int xid_advance_interval
Definition: worker.c:445
int wal_receiver_status_interval
Definition: walreceiver.c:88

References MAX_XID_ADVANCE_INTERVAL, Subscription::maxretention, Min, MIN_XID_ADVANCE_INTERVAL, MySubscription, Subscription::retentionactive, wal_receiver_status_interval, and RetainDeadTuplesData::xid_advance_interval.

Referenced by get_candidate_xid().

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3747 of file worker.c.

3748{
3750 LogicalRepMsgType saved_command;
3751
3752 /*
3753 * Set the current command being applied. Since this function can be
3754 * called recursively when applying spooled changes, save the current
3755 * command.
3756 */
3757 saved_command = apply_error_callback_arg.command;
3759
3760 switch (action)
3761 {
3764 break;
3765
3768 break;
3769
3772 break;
3773
3776 break;
3777
3780 break;
3781
3784 break;
3785
3788 break;
3789
3792 break;
3793
3796 break;
3797
3799
3800 /*
3801 * Logical replication does not use generic logical messages yet.
3802 * Although, it could be used by other applications that use this
3803 * output plugin.
3804 */
3805 break;
3806
3809 break;
3810
3813 break;
3814
3817 break;
3818
3821 break;
3822
3825 break;
3826
3829 break;
3830
3833 break;
3834
3837 break;
3838
3841 break;
3842
3843 default:
3844 ereport(ERROR,
3845 (errcode(ERRCODE_PROTOCOL_VIOLATION),
3846 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3847 }
3848
3849 /* Reset the current command */
3850 apply_error_callback_arg.command = saved_command;
3851}
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1500
static void apply_handle_type(StringInfo s)
Definition: worker.c:2561
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3619
static void apply_handle_update(StringInfo s)
Definition: worker.c:2765
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2368
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1393
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:459
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2987
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1205
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1230
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:2049
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2538
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1322
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1442
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1863
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1645
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1256
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1704
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2608
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
LogicalRepMsgType command
Definition: worker.c:325

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 6118 of file worker.c.

6119{
6121
6123 return;
6124
6125 Assert(errarg->origin_name);
6126
6127 if (errarg->rel == NULL)
6128 {
6129 if (!TransactionIdIsValid(errarg->remote_xid))
6130 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6131 errarg->origin_name,
6133 else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6134 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6135 errarg->origin_name,
6137 errarg->remote_xid);
6138 else
6139 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6140 errarg->origin_name,
6142 errarg->remote_xid,
6143 LSN_FORMAT_ARGS(errarg->finish_lsn));
6144 }
6145 else
6146 {
6147 if (errarg->remote_attnum < 0)
6148 {
6149 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6150 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6151 errarg->origin_name,
6153 errarg->rel->remoterel.nspname,
6154 errarg->rel->remoterel.relname,
6155 errarg->remote_xid);
6156 else
6157 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6158 errarg->origin_name,
6160 errarg->rel->remoterel.nspname,
6161 errarg->rel->remoterel.relname,
6162 errarg->remote_xid,
6163 LSN_FORMAT_ARGS(errarg->finish_lsn));
6164 }
6165 else
6166 {
6167 if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6168 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6169 errarg->origin_name,
6171 errarg->rel->remoterel.nspname,
6172 errarg->rel->remoterel.relname,
6173 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6174 errarg->remote_xid);
6175 else
6176 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6177 errarg->origin_name,
6179 errarg->rel->remoterel.nspname,
6180 errarg->rel->remoterel.relname,
6181 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6182 errarg->remote_xid,
6183 LSN_FORMAT_ARGS(errarg->finish_lsn));
6184 }
6185 }
6186}
#define errcontext
Definition: elog.h:198
Assert(PointerIsAligned(start, uint64))
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1209
TransactionId remote_xid
Definition: worker.c:330
XLogRecPtr finish_lsn
Definition: worker.c:331
LogicalRepRelMapEntry * rel
Definition: worker.c:326
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 1205 of file worker.c.

1206{
1207 LogicalRepBeginData begin_data;
1208
1209 /* There must not be an active streaming transaction. */
1211
1212 logicalrep_read_begin(s, &begin_data);
1213 set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1214
1215 remote_final_lsn = begin_data.final_lsn;
1216
1218
1219 in_remote_transaction = true;
1220
1222}
bool in_remote_transaction
Definition: worker.c:484
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:6190
static XLogRecPtr remote_final_lsn
Definition: worker.c:485
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:5980
static TransactionId stream_xid
Definition: worker.c:490
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:63
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert(), LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1256 of file worker.c.

1257{
1258 LogicalRepPreparedTxnData begin_data;
1259
1260 /* Tablesync should never receive prepare. */
1261 if (am_tablesync_worker())
1262 ereport(ERROR,
1263 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1264 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1265
1266 /* There must not be an active streaming transaction. */
1268
1269 logicalrep_read_begin_prepare(s, &begin_data);
1270 set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1271
1272 remote_final_lsn = begin_data.prepare_lsn;
1273
1275
1276 in_remote_transaction = true;
1277
1279}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1161
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:134
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1230 of file worker.c.

1231{
1232 LogicalRepCommitData commit_data;
1233
1234 logicalrep_read_commit(s, &commit_data);
1235
1236 if (commit_data.commit_lsn != remote_final_lsn)
1237 ereport(ERROR,
1238 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1239 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1240 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1242
1243 apply_handle_commit_internal(&commit_data);
1244
1245 /* Process any tables that are being synchronized in parallel. */
1246 process_syncing_tables(commit_data.end_lsn);
1247
1250}
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2478
static void reset_apply_error_context_info(void)
Definition: worker.c:6198
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:98
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:696

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2478 of file worker.c.

2479{
2480 if (is_skipping_changes())
2481 {
2483
2484 /*
2485 * Start a new transaction to clear the subskiplsn, if not started
2486 * yet.
2487 */
2488 if (!IsTransactionState())
2490 }
2491
2492 if (IsTransactionState())
2493 {
2494 /*
2495 * The transaction is either non-empty or skipped, so we clear the
2496 * subskiplsn.
2497 */
2499
2500 /*
2501 * Update origin state so we can restart streaming from correct
2502 * position in case of crash.
2503 */
2506
2508
2509 if (IsTransactionBlock())
2510 {
2511 EndTransactionBlock(false);
2513 }
2514
2515 pgstat_report_stat(false);
2516
2518 }
2519 else
2520 {
2521 /* Process any invalidation messages that might have accumulated. */
2524 }
2525
2526 in_remote_transaction = false;
2527}
static void stop_skipping_changes(void)
Definition: worker.c:6007
#define is_skipping_changes()
Definition: worker.c:517
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:6029
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3911
void maybe_reread_subscription(void)
Definition: worker.c:5007
void AcceptInvalidationMessages(void)
Definition: inval.c:930
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:387
void StartTransactionCommand(void)
Definition: xact.c:3071
bool IsTransactionBlock(void)
Definition: xact.c:4983
void CommitTransactionCommand(void)
Definition: xact.c:3169
bool EndTransactionBlock(bool chain)
Definition: xact.c:4056
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:256

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1393 of file worker.c.

1394{
1396 char gid[GIDSIZE];
1397
1398 logicalrep_read_commit_prepared(s, &prepare_data);
1399 set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1400
1401 /* Compute GID for two_phase transactions. */
1403 gid, sizeof(gid));
1404
1405 /* There is no transaction when COMMIT PREPARED is called */
1407
1408 /*
1409 * Update origin state so we can restart streaming from correct position
1410 * in case of crash.
1411 */
1414
1415 FinishPreparedTransaction(gid, true);
1418 pgstat_report_stat(false);
1419
1421 in_remote_transaction = false;
1422
1423 /* Process any tables that are being synchronized in parallel. */
1424 process_syncing_tables(prepare_data.end_lsn);
1425
1427
1430}
static void begin_replication_step(void)
Definition: worker.c:721
static void end_replication_step(void)
Definition: worker.c:744
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:267
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition: twophase.c:2747
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1497
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2987 of file worker.c.

2988{
2990 LogicalRepTupleData oldtup;
2991 LogicalRepRelId relid;
2992 UserContext ucxt;
2993 ApplyExecutionData *edata;
2994 EState *estate;
2995 TupleTableSlot *remoteslot;
2996 MemoryContext oldctx;
2997 bool run_as_owner;
2998
2999 /*
3000 * Quick return if we are skipping data modification changes or handling
3001 * streamed transactions.
3002 */
3003 if (is_skipping_changes() ||
3005 return;
3006
3008
3009 relid = logicalrep_read_delete(s, &oldtup);
3012 {
3013 /*
3014 * The relation can't become interesting in the middle of the
3015 * transaction so it's safe to unlock it.
3016 */
3019 return;
3020 }
3021
3022 /* Set relation for error callback */
3024
3025 /* Check if we can do the delete. */
3027
3028 /*
3029 * Make sure that any user-supplied code runs as the table owner, unless
3030 * the user has opted out of that behavior.
3031 */
3032 run_as_owner = MySubscription->runasowner;
3033 if (!run_as_owner)
3034 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3035
3036 /* Initialize the executor state. */
3037 edata = create_edata_for_relation(rel);
3038 estate = edata->estate;
3039 remoteslot = ExecInitExtraTupleSlot(estate,
3041 &TTSOpsVirtual);
3042
3043 /* Build the search tuple. */
3045 slot_store_data(remoteslot, rel, &oldtup);
3046 MemoryContextSwitchTo(oldctx);
3047
3048 /* For a partitioned table, apply delete to correct partition. */
3049 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3051 remoteslot, NULL, CMD_DELETE);
3052 else
3053 {
3054 ResultRelInfo *relinfo = edata->targetRelInfo;
3055
3056 ExecOpenIndices(relinfo, false);
3057 apply_handle_delete_internal(edata, relinfo,
3058 remoteslot, rel->localindexoid);
3059 ExecCloseIndices(relinfo);
3060 }
3061
3062 finish_edata(edata);
3063
3064 /* Reset relation for error callback */
3066
3067 if (!run_as_owner)
3068 RestoreUserContext(&ucxt);
3069
3071
3073}
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2724
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:865
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:681
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:772
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:3325
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:1011
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:923
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:3081
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:238
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2020
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:658
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:278
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:561
#define RelationGetDescr(relation)
Definition: rel.h:540
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:504
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:349
ResultRelInfo * targetRelInfo
Definition: worker.c:315
EState * estate
Definition: worker.c:312
Form_pg_class rd_rel
Definition: rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 3081 of file worker.c.

3085{
3086 EState *estate = edata->estate;
3087 Relation localrel = relinfo->ri_RelationDesc;
3088 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3089 EPQState epqstate;
3090 TupleTableSlot *localslot;
3091 ConflictTupleInfo conflicttuple = {0};
3092 bool found;
3093
3094 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3095
3096 /* Caller should have opened indexes already. */
3097 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3098 !localrel->rd_rel->relhasindex ||
3099 RelationGetIndexList(localrel) == NIL);
3100
3101 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3102 remoteslot, &localslot);
3103
3104 /* If found delete it. */
3105 if (found)
3106 {
3107 /*
3108 * Report the conflict if the tuple was modified by a different
3109 * origin.
3110 */
3111 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3112 &conflicttuple.origin, &conflicttuple.ts) &&
3113 conflicttuple.origin != replorigin_session_origin)
3114 {
3115 conflicttuple.slot = localslot;
3117 remoteslot, NULL,
3118 list_make1(&conflicttuple));
3119 }
3120
3121 EvalPlanQualSetSlot(&epqstate, localslot);
3122
3123 /* Do the actual delete. */
3125 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3126 }
3127 else
3128 {
3129 /*
3130 * The tuple to be deleted could not be found. Do nothing except for
3131 * emitting a log message.
3132 */
3133 ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3134 remoteslot, NULL, list_make1(&conflicttuple));
3135 }
3136
3137 /* Cleanup. */
3138 EvalPlanQualEnd(&epqstate);
3139}
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:3149
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2576
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:104
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:63
@ CT_DELETE_MISSING
Definition: conflict.h:52
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:49
#define LOG
Definition: elog.h:31
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition: execMain.c:2708
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:3169
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:286
RepOriginId replorigin_session_origin
Definition: origin.c:163
#define ACL_DELETE
Definition: parsenodes.h:79
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
List * RelationGetIndexList(Relation relation)
Definition: relcache.c:4836
LogicalRepRelMapEntry * targetRel
Definition: worker.c:314
TimestampTz ts
Definition: conflict.h:78
RepOriginId origin
Definition: conflict.h:77
TransactionId xmin
Definition: conflict.h:75
TupleTableSlot * slot
Definition: conflict.h:71
Relation ri_RelationDesc
Definition: execnodes.h:480
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:486

References ACL_DELETE, Assert(), CT_DELETE_MISSING, CT_DELETE_ORIGIN_DIFFERS, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), GetTupleTransactionInfo(), list_make1, LOG, NIL, ConflictTupleInfo::origin, RelationData::rd_rel, RelationGetIndexList(), LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_IndexRelationDescs, ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ConflictTupleInfo::ts, and ConflictTupleInfo::xmin.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2608 of file worker.c.

2609{
2611 LogicalRepTupleData newtup;
2612 LogicalRepRelId relid;
2613 UserContext ucxt;
2614 ApplyExecutionData *edata;
2615 EState *estate;
2616 TupleTableSlot *remoteslot;
2617 MemoryContext oldctx;
2618 bool run_as_owner;
2619
2620 /*
2621 * Quick return if we are skipping data modification changes or handling
2622 * streamed transactions.
2623 */
2624 if (is_skipping_changes() ||
2626 return;
2627
2629
2630 relid = logicalrep_read_insert(s, &newtup);
2633 {
2634 /*
2635 * The relation can't become interesting in the middle of the
2636 * transaction so it's safe to unlock it.
2637 */
2640 return;
2641 }
2642
2643 /*
2644 * Make sure that any user-supplied code runs as the table owner, unless
2645 * the user has opted out of that behavior.
2646 */
2647 run_as_owner = MySubscription->runasowner;
2648 if (!run_as_owner)
2649 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2650
2651 /* Set relation for error callback */
2653
2654 /* Initialize the executor state. */
2655 edata = create_edata_for_relation(rel);
2656 estate = edata->estate;
2657 remoteslot = ExecInitExtraTupleSlot(estate,
2659 &TTSOpsVirtual);
2660
2661 /* Process and store remote tuple in the slot */
2663 slot_store_data(remoteslot, rel, &newtup);
2664 slot_fill_defaults(rel, estate, remoteslot);
2665 MemoryContextSwitchTo(oldctx);
2666
2667 /* For a partitioned table, insert the tuple into a partition. */
2668 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2670 remoteslot, NULL, CMD_INSERT);
2671 else
2672 {
2673 ResultRelInfo *relinfo = edata->targetRelInfo;
2674
2675 ExecOpenIndices(relinfo, false);
2676 apply_handle_insert_internal(edata, relinfo, remoteslot);
2677 ExecCloseIndices(relinfo);
2678 }
2679
2680 finish_edata(edata);
2681
2682 /* Reset relation for error callback */
2684
2685 if (!run_as_owner)
2686 RestoreUserContext(&ucxt);
2687
2689
2691}
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2699
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:954
@ CMD_INSERT
Definition: nodes.h:277
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:428

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2699 of file worker.c.

2702{
2703 EState *estate = edata->estate;
2704
2705 /* Caller should have opened indexes already. */
2706 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2707 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2709
2710 /* Caller will not have done this bit. */
2712 InitConflictIndexes(relinfo);
2713
2714 /* Do the insert. */
2716 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2717}
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:139
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:76
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:580

References ACL_INSERT, Assert(), ApplyExecutionData::estate, ExecSimpleRelationInsert(), InitConflictIndexes(), NIL, RelationData::rd_rel, RelationGetIndexList(), ResultRelInfo::ri_IndexRelationDescs, ResultRelInfo::ri_onConflictArbiterIndexes, ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1645 of file worker.c.

1646{
1647 /*
1648 * ORIGIN message can only come inside streaming transaction or inside
1649 * remote transaction and before any actual writes.
1650 */
1654 ereport(ERROR,
1655 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656 errmsg_internal("ORIGIN message sent out of order")));
1657}
static bool in_streamed_transaction
Definition: worker.c:488

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1322 of file worker.c.

1323{
1324 LogicalRepPreparedTxnData prepare_data;
1325
1326 logicalrep_read_prepare(s, &prepare_data);
1327
1328 if (prepare_data.prepare_lsn != remote_final_lsn)
1329 ereport(ERROR,
1330 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1331 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1332 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1334
1335 /*
1336 * Unlike commit, here, we always prepare the transaction even though no
1337 * change has happened in this transaction or all changes are skipped. It
1338 * is done this way because at commit prepared time, we won't know whether
1339 * we have skipped preparing a transaction because of those reasons.
1340 *
1341 * XXX, We can optimize such that at commit prepared time, we first check
1342 * whether we have prepared the transaction or not but that doesn't seem
1343 * worthwhile because such cases shouldn't be common.
1344 */
1346
1347 apply_handle_prepare_internal(&prepare_data);
1348
1351 pgstat_report_stat(false);
1352
1353 /*
1354 * It is okay not to set the local_end LSN for the prepare because we
1355 * always flush the prepare record. So, we can send the acknowledgment of
1356 * the remote_end LSN as soon as prepare is finished.
1357 *
1358 * XXX For the sake of consistency with commit, we could have set it with
1359 * the LSN of prepare but as of now we don't track that value similar to
1360 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1361 * it.
1362 */
1364
1365 in_remote_transaction = false;
1366
1367 /* Process any tables that are being synchronized in parallel. */
1368 process_syncing_tables(prepare_data.end_lsn);
1369
1370 /*
1371 * Since we have already prepared the transaction, in a case where the
1372 * server crashes before clearing the subskiplsn, it will be left but the
1373 * transaction won't be resent. But that's okay because it's a rare case
1374 * and the subskiplsn will be cleared when finishing the next transaction.
1375 */
1378
1381}
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1285
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:228
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1285 of file worker.c.

1286{
1287 char gid[GIDSIZE];
1288
1289 /*
1290 * Compute unique GID for two_phase transactions. We don't use GID of
1291 * prepared transaction sent by server as that can lead to deadlock when
1292 * we have multiple subscriptions from same node point to publications on
1293 * the same node. See comments atop worker.c
1294 */
1296 gid, sizeof(gid));
1297
1298 /*
1299 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1300 * called within the PrepareTransactionBlock below.
1301 */
1302 if (!IsTransactionBlock())
1303 {
1305 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1306 }
1307
1308 /*
1309 * Update origin state so we can restart streaming from correct position
1310 * in case of crash.
1311 */
1312 replorigin_session_origin_lsn = prepare_data->end_lsn;
1314
1316}
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:4004
void BeginTransactionBlock(void)
Definition: xact.c:3936

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2538 of file worker.c.

2539{
2540 LogicalRepRelation *rel;
2541
2543 return;
2544
2545 rel = logicalrep_read_rel(s);
2547
2548 /* Also reset all entries in the partition map that refer to remoterel. */
2550}
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:698
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:571
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1442 of file worker.c.

1443{
1445 char gid[GIDSIZE];
1446
1447 logicalrep_read_rollback_prepared(s, &rollback_data);
1448 set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1449
1450 /* Compute GID for two_phase transactions. */
1452 gid, sizeof(gid));
1453
1454 /*
1455 * It is possible that we haven't received prepare because it occurred
1456 * before walsender reached a consistent point or the two_phase was still
1457 * not enabled by that time, so in such cases, we need to skip rollback
1458 * prepared.
1459 */
1460 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1461 rollback_data.prepare_time))
1462 {
1463 /*
1464 * Update origin state so we can restart streaming from correct
1465 * position in case of crash.
1466 */
1469
1470 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1472 FinishPreparedTransaction(gid, false);
1475
1477 }
1478
1479 pgstat_report_stat(false);
1480
1481 /*
1482 * It is okay not to set the local_end LSN for the rollback of prepared
1483 * transaction because we always flush the WAL record for it. See
1484 * apply_handle_prepare.
1485 */
1487 in_remote_transaction = false;
1488
1489 /* Process any tables that are being synchronized in parallel. */
1491
1494}
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:325
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2688

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 2049 of file worker.c.

2050{
2051 TransactionId xid;
2052 TransactionId subxid;
2053 LogicalRepStreamAbortData abort_data;
2055 TransApplyAction apply_action;
2056
2057 /* Save the message before it is consumed. */
2058 StringInfoData original_msg = *s;
2059 bool toplevel_xact;
2060
2062 ereport(ERROR,
2063 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2064 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2065
2066 /* We receive abort information only when we can apply in parallel. */
2067 logicalrep_read_stream_abort(s, &abort_data,
2069
2070 xid = abort_data.xid;
2071 subxid = abort_data.subxid;
2072 toplevel_xact = (xid == subxid);
2073
2074 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2075
2076 apply_action = get_transaction_apply_action(xid, &winfo);
2077
2078 switch (apply_action)
2079 {
2080 case TRANS_LEADER_APPLY:
2081
2082 /*
2083 * We are in the leader apply worker and the transaction has been
2084 * serialized to file.
2085 */
2086 stream_abort_internal(xid, subxid);
2087
2088 elog(DEBUG1, "finished processing the STREAM ABORT command");
2089 break;
2090
2092 Assert(winfo);
2093
2094 /*
2095 * For the case of aborting the subtransaction, we increment the
2096 * number of streaming blocks and take the lock again before
2097 * sending the STREAM_ABORT to ensure that the parallel apply
2098 * worker will wait on the lock for the next set of changes after
2099 * processing the STREAM_ABORT message if it is not already
2100 * waiting for STREAM_STOP message.
2101 *
2102 * It is important to perform this locking before sending the
2103 * STREAM_ABORT message so that the leader can hold the lock first
2104 * and the parallel apply worker will wait for the leader to
2105 * release the lock. This is the same as what we do in
2106 * apply_handle_stream_stop. See Locking Considerations atop
2107 * applyparallelworker.c.
2108 */
2109 if (!toplevel_xact)
2110 {
2114 }
2115
2116 if (pa_send_data(winfo, s->len, s->data))
2117 {
2118 /*
2119 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2120 * wait here for the parallel apply worker to finish as that
2121 * is not required to maintain the commit order and won't have
2122 * the risk of failures due to transaction dependencies and
2123 * deadlocks. However, it is possible that before the parallel
2124 * worker finishes and we clear the worker info, the xid
2125 * wraparound happens on the upstream and a new transaction
2126 * with the same xid can appear and that can lead to duplicate
2127 * entries in ParallelApplyTxnHash. Yet another problem could
2128 * be that we may have serialized the changes in partial
2129 * serialize mode and the file containing xact changes may
2130 * already exist, and after xid wraparound trying to create
2131 * the file for the same xid can lead to an error. To avoid
2132 * these problems, we decide to wait for the aborts to finish.
2133 *
2134 * Note, it is okay to not update the flush location position
2135 * for aborts as in worst case that means such a transaction
2136 * won't be sent again after restart.
2137 */
2138 if (toplevel_xact)
2140
2141 break;
2142 }
2143
2144 /*
2145 * Switch to serialize mode when we are not able to send the
2146 * change to parallel apply worker.
2147 */
2148 pa_switch_to_partial_serialize(winfo, true);
2149
2150 /* fall through */
2152 Assert(winfo);
2153
2154 /*
2155 * Parallel apply worker might have applied some changes, so write
2156 * the STREAM_ABORT message so that it can rollback the
2157 * subtransaction if needed.
2158 */
2160 &original_msg);
2161
2162 if (toplevel_xact)
2163 {
2166 }
2167 break;
2168
2170
2171 /*
2172 * If the parallel apply worker is applying spooled messages then
2173 * close the file before aborting.
2174 */
2175 if (toplevel_xact && stream_fd)
2177
2178 pa_stream_abort(&abort_data);
2179
2180 /*
2181 * We need to wait after processing rollback to savepoint for the
2182 * next set of changes.
2183 *
2184 * We have a race condition here due to which we can start waiting
2185 * here when there are more chunk of streams in the queue. See
2186 * apply_handle_stream_stop.
2187 */
2188 if (!toplevel_xact)
2190
2191 elog(DEBUG1, "finished processing the STREAM ABORT command");
2192 break;
2193
2194 default:
2195 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2196 break;
2197 }
2198
2200}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:422
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:6275
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:5467
static BufFile * stream_fd
Definition: worker.c:520
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1966
static void stream_close_file(void)
Definition: worker.c:5419
uint32 TransactionId
Definition: c.h:657
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1184
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2368 of file worker.c.

2369{
2370 TransactionId xid;
2371 LogicalRepCommitData commit_data;
2373 TransApplyAction apply_action;
2374
2375 /* Save the message before it is consumed. */
2376 StringInfoData original_msg = *s;
2377
2379 ereport(ERROR,
2380 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2381 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2382
2383 xid = logicalrep_read_stream_commit(s, &commit_data);
2384 set_apply_error_context_xact(xid, commit_data.commit_lsn);
2385
2386 apply_action = get_transaction_apply_action(xid, &winfo);
2387
2388 switch (apply_action)
2389 {
2390 case TRANS_LEADER_APPLY:
2391
2392 /*
2393 * The transaction has been serialized to file, so replay all the
2394 * spooled operations.
2395 */
2397 commit_data.commit_lsn);
2398
2399 apply_handle_commit_internal(&commit_data);
2400
2401 /* Unlink the files with serialized changes and subxact info. */
2403
2404 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2405 break;
2406
2408 Assert(winfo);
2409
2410 if (pa_send_data(winfo, s->len, s->data))
2411 {
2412 /* Finish processing the streaming transaction. */
2413 pa_xact_finish(winfo, commit_data.end_lsn);
2414 break;
2415 }
2416
2417 /*
2418 * Switch to serialize mode when we are not able to send the
2419 * change to parallel apply worker.
2420 */
2421 pa_switch_to_partial_serialize(winfo, true);
2422
2423 /* fall through */
2425 Assert(winfo);
2426
2428 &original_msg);
2429
2431
2432 /* Finish processing the streaming transaction. */
2433 pa_xact_finish(winfo, commit_data.end_lsn);
2434 break;
2435
2437
2438 /*
2439 * If the parallel apply worker is applying spooled messages then
2440 * close the file before committing.
2441 */
2442 if (stream_fd)
2444
2445 apply_handle_commit_internal(&commit_data);
2446
2448
2449 /*
2450 * It is important to set the transaction state as finished before
2451 * releasing the lock. See pa_wait_for_xact_finish.
2452 */
2455
2457
2458 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2459 break;
2460
2461 default:
2462 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2463 break;
2464 }
2465
2466 /* Process any tables that are being synchronized in parallel. */
2467 process_syncing_tables(commit_data.end_lsn);
2468
2470
2472}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:5350
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2238
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1129
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert(), LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1500 of file worker.c.

1501{
1502 LogicalRepPreparedTxnData prepare_data;
1504 TransApplyAction apply_action;
1505
1506 /* Save the message before it is consumed. */
1507 StringInfoData original_msg = *s;
1508
1510 ereport(ERROR,
1511 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1512 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1513
1514 /* Tablesync should never receive prepare. */
1515 if (am_tablesync_worker())
1516 ereport(ERROR,
1517 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1518 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1519
1520 logicalrep_read_stream_prepare(s, &prepare_data);
1521 set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1522
1523 apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1524
1525 switch (apply_action)
1526 {
1527 case TRANS_LEADER_APPLY:
1528
1529 /*
1530 * The transaction has been serialized to file, so replay all the
1531 * spooled operations.
1532 */
1534 prepare_data.xid, prepare_data.prepare_lsn);
1535
1536 /* Mark the transaction as prepared. */
1537 apply_handle_prepare_internal(&prepare_data);
1538
1540
1541 /*
1542 * It is okay not to set the local_end LSN for the prepare because
1543 * we always flush the prepare record. See apply_handle_prepare.
1544 */
1546
1547 in_remote_transaction = false;
1548
1549 /* Unlink the files with serialized changes and subxact info. */
1551
1552 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1553 break;
1554
1556 Assert(winfo);
1557
1558 if (pa_send_data(winfo, s->len, s->data))
1559 {
1560 /* Finish processing the streaming transaction. */
1561 pa_xact_finish(winfo, prepare_data.end_lsn);
1562 break;
1563 }
1564
1565 /*
1566 * Switch to serialize mode when we are not able to send the
1567 * change to parallel apply worker.
1568 */
1569 pa_switch_to_partial_serialize(winfo, true);
1570
1571 /* fall through */
1573 Assert(winfo);
1574
1575 stream_open_and_write_change(prepare_data.xid,
1577 &original_msg);
1578
1580
1581 /* Finish processing the streaming transaction. */
1582 pa_xact_finish(winfo, prepare_data.end_lsn);
1583 break;
1584
1586
1587 /*
1588 * If the parallel apply worker is applying spooled messages then
1589 * close the file before preparing.
1590 */
1591 if (stream_fd)
1593
1595
1596 /* Mark the transaction as prepared. */
1597 apply_handle_prepare_internal(&prepare_data);
1598
1600
1602
1603 /*
1604 * It is okay not to set the local_end LSN for the prepare because
1605 * we always flush the prepare record. See apply_handle_prepare.
1606 */
1608
1611
1613
1614 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1615 break;
1616
1617 default:
1618 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1619 break;
1620 }
1621
1622 pgstat_report_stat(false);
1623
1624 /* Process any tables that are being synchronized in parallel. */
1625 process_syncing_tables(prepare_data.end_lsn);
1626
1627 /*
1628 * Similar to prepare case, the subskiplsn could be left in a case of
1629 * server crash but it's okay. See the comments in apply_handle_prepare().
1630 */
1633
1635
1637}
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:365

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1704 of file worker.c.

1705{
1706 bool first_segment;
1708 TransApplyAction apply_action;
1709
1710 /* Save the message before it is consumed. */
1711 StringInfoData original_msg = *s;
1712
1714 ereport(ERROR,
1715 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716 errmsg_internal("duplicate STREAM START message")));
1717
1718 /* There must not be an active streaming transaction. */
1720
1721 /* notify handle methods we're processing a remote transaction */
1723
1724 /* extract XID of the top-level transaction */
1725 stream_xid = logicalrep_read_stream_start(s, &first_segment);
1726
1728 ereport(ERROR,
1729 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1730 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1731
1733
1734 /* Try to allocate a worker for the streaming transaction. */
1735 if (first_segment)
1737
1738 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1739
1740 switch (apply_action)
1741 {
1743
1744 /*
1745 * Function stream_start_internal starts a transaction. This
1746 * transaction will be committed on the stream stop unless it is a
1747 * tablesync worker in which case it will be committed after
1748 * processing all the messages. We need this transaction for
1749 * handling the BufFile, used for serializing the streaming data
1750 * and subxact info.
1751 */
1752 stream_start_internal(stream_xid, first_segment);
1753 break;
1754
1756 Assert(winfo);
1757
1758 /*
1759 * Once we start serializing the changes, the parallel apply
1760 * worker will wait for the leader to release the stream lock
1761 * until the end of the transaction. So, we don't need to release
1762 * the lock or increment the stream count in that case.
1763 */
1764 if (pa_send_data(winfo, s->len, s->data))
1765 {
1766 /*
1767 * Unlock the shared object lock so that the parallel apply
1768 * worker can continue to receive changes.
1769 */
1770 if (!first_segment)
1772
1773 /*
1774 * Increment the number of streaming blocks waiting to be
1775 * processed by parallel apply worker.
1776 */
1778
1779 /* Cache the parallel apply worker for this transaction. */
1781 break;
1782 }
1783
1784 /*
1785 * Switch to serialize mode when we are not able to send the
1786 * change to parallel apply worker.
1787 */
1788 pa_switch_to_partial_serialize(winfo, !first_segment);
1789
1790 /* fall through */
1792 Assert(winfo);
1793
1794 /*
1795 * Open the spool file unless it was already opened when switching
1796 * to serialize mode. The transaction started in
1797 * stream_start_internal will be committed on the stream stop.
1798 */
1799 if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1800 stream_start_internal(stream_xid, first_segment);
1801
1803
1804 /* Cache the parallel apply worker for this transaction. */
1806 break;
1807
1809 if (first_segment)
1810 {
1811 /* Hold the lock until the end of the transaction. */
1814
1815 /*
1816 * Signal the leader apply worker, as it may be waiting for
1817 * us.
1818 */
1820 }
1821
1823 break;
1824
1825 default:
1826 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1827 break;
1828 }
1829
1831}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:496
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:5437
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1666
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:700
#define InvalidOid
Definition: postgres_ext.h:37
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1079
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert(), StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1863 of file worker.c.

1864{
1866 TransApplyAction apply_action;
1867
1869 ereport(ERROR,
1870 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1871 errmsg_internal("STREAM STOP message without STREAM START")));
1872
1873 apply_action = get_transaction_apply_action(stream_xid, &winfo);
1874
1875 switch (apply_action)
1876 {
1879 break;
1880
1882 Assert(winfo);
1883
1884 /*
1885 * Lock before sending the STREAM_STOP message so that the leader
1886 * can hold the lock first and the parallel apply worker will wait
1887 * for leader to release the lock. See Locking Considerations atop
1888 * applyparallelworker.c.
1889 */
1891
1892 if (pa_send_data(winfo, s->len, s->data))
1893 {
1895 break;
1896 }
1897
1898 /*
1899 * Switch to serialize mode when we are not able to send the
1900 * change to parallel apply worker.
1901 */
1902 pa_switch_to_partial_serialize(winfo, true);
1903
1904 /* fall through */
1909 break;
1910
1912 elog(DEBUG1, "applied %u changes in the streaming chunk",
1914
1915 /*
1916 * By the time parallel apply worker is processing the changes in
1917 * the current streaming block, the leader apply worker may have
1918 * sent multiple streaming blocks. This can lead to parallel apply
1919 * worker start waiting even when there are more chunk of streams
1920 * in the queue. So, try to lock only if there is no message left
1921 * in the queue. See Locking Considerations atop
1922 * applyparallelworker.c.
1923 *
1924 * Note that here we have a race condition where we can start
1925 * waiting even when there are pending streaming chunks. This can
1926 * happen if the leader sends another streaming block and acquires
1927 * the stream lock again after the parallel apply worker checks
1928 * that there is no pending streaming block and before it actually
1929 * starts waiting on a lock. We can handle this case by not
1930 * allowing the leader to increment the stream block count during
1931 * the time parallel apply worker acquires the lock but it is not
1932 * clear whether that is worth the complexity.
1933 *
1934 * Now, if this missed chunk contains rollback to savepoint, then
1935 * there is a risk of deadlock which probably shouldn't happen
1936 * after restart.
1937 */
1939 break;
1940
1941 default:
1942 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1943 break;
1944 }
1945
1948
1949 /*
1950 * The parallel apply worker could be in a transaction in which case we
1951 * need to report the state as STATE_IDLEINTRANSACTION.
1952 */
1955 else
1957
1959}
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1840
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5001

References AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3619 of file worker.c.

3620{
3621 bool cascade = false;
3622 bool restart_seqs = false;
3623 List *remote_relids = NIL;
3624 List *remote_rels = NIL;
3625 List *rels = NIL;
3626 List *part_rels = NIL;
3627 List *relids = NIL;
3628 List *relids_logged = NIL;
3629 ListCell *lc;
3630 LOCKMODE lockmode = AccessExclusiveLock;
3631
3632 /*
3633 * Quick return if we are skipping data modification changes or handling
3634 * streamed transactions.
3635 */
3636 if (is_skipping_changes() ||
3638 return;
3639
3641
3642 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3643
3644 foreach(lc, remote_relids)
3645 {
3646 LogicalRepRelId relid = lfirst_oid(lc);
3648
3649 rel = logicalrep_rel_open(relid, lockmode);
3651 {
3652 /*
3653 * The relation can't become interesting in the middle of the
3654 * transaction so it's safe to unlock it.
3655 */
3656 logicalrep_rel_close(rel, lockmode);
3657 continue;
3658 }
3659
3660 remote_rels = lappend(remote_rels, rel);
3662 rels = lappend(rels, rel->localrel);
3663 relids = lappend_oid(relids, rel->localreloid);
3665 relids_logged = lappend_oid(relids_logged, rel->localreloid);
3666
3667 /*
3668 * Truncate partitions if we got a message to truncate a partitioned
3669 * table.
3670 */
3671 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3672 {
3673 ListCell *child;
3674 List *children = find_all_inheritors(rel->localreloid,
3675 lockmode,
3676 NULL);
3677
3678 foreach(child, children)
3679 {
3680 Oid childrelid = lfirst_oid(child);
3681 Relation childrel;
3682
3683 if (list_member_oid(relids, childrelid))
3684 continue;
3685
3686 /* find_all_inheritors already got lock */
3687 childrel = table_open(childrelid, NoLock);
3688
3689 /*
3690 * Ignore temp tables of other backends. See similar code in
3691 * ExecuteTruncate().
3692 */
3693 if (RELATION_IS_OTHER_TEMP(childrel))
3694 {
3695 table_close(childrel, lockmode);
3696 continue;
3697 }
3698
3700 rels = lappend(rels, childrel);
3701 part_rels = lappend(part_rels, childrel);
3702 relids = lappend_oid(relids, childrelid);
3703 /* Log this relation only if needed for logical decoding */
3704 if (RelationIsLogicallyLogged(childrel))
3705 relids_logged = lappend_oid(relids_logged, childrelid);
3706 }
3707 }
3708 }
3709
3710 /*
3711 * Even if we used CASCADE on the upstream primary we explicitly default
3712 * to replaying changes without further cascading. This might be later
3713 * changeable with a user specified option.
3714 *
3715 * MySubscription->runasowner tells us whether we want to execute
3716 * replication actions as the subscription owner; the last argument to
3717 * TruncateGuts tells it whether we want to switch to the table owner.
3718 * Those are exactly opposite conditions.
3719 */
3721 relids,
3722 relids_logged,
3724 restart_seqs,
3726 foreach(lc, remote_rels)
3727 {
3728 LogicalRepRelMapEntry *rel = lfirst(lc);
3729
3731 }
3732 foreach(lc, part_rels)
3733 {
3734 Relation rel = lfirst(lc);
3735
3736 table_close(rel, NoLock);
3737 }
3738
3740}
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2398
#define ACL_TRUNCATE
Definition: parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:32
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:615
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:710
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:667
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition: tablecmds.c:1976

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 3325 of file worker.c.

3329{
3330 EState *estate = edata->estate;
3331 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3332 ResultRelInfo *relinfo = edata->targetRelInfo;
3333 Relation parentrel = relinfo->ri_RelationDesc;
3334 ModifyTableState *mtstate;
3335 PartitionTupleRouting *proute;
3336 ResultRelInfo *partrelinfo;
3337 Relation partrel;
3338 TupleTableSlot *remoteslot_part;
3339 TupleConversionMap *map;
3340 MemoryContext oldctx;
3341 LogicalRepRelMapEntry *part_entry = NULL;
3342 AttrMap *attrmap = NULL;
3343
3344 /* ModifyTableState is needed for ExecFindPartition(). */
3345 edata->mtstate = mtstate = makeNode(ModifyTableState);
3346 mtstate->ps.plan = NULL;
3347 mtstate->ps.state = estate;
3348 mtstate->operation = operation;
3349 mtstate->resultRelInfo = relinfo;
3350
3351 /* ... as is PartitionTupleRouting. */
3352 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3353
3354 /*
3355 * Find the partition to which the "search tuple" belongs.
3356 */
3357 Assert(remoteslot != NULL);
3359 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3360 remoteslot, estate);
3361 Assert(partrelinfo != NULL);
3362 partrel = partrelinfo->ri_RelationDesc;
3363
3364 /*
3365 * Check for supported relkind. We need this since partitions might be of
3366 * unsupported relkinds; and the set of partitions can change, so checking
3367 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3368 */
3369 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3371 RelationGetRelationName(partrel));
3372
3373 /*
3374 * To perform any of the operations below, the tuple must match the
3375 * partition's rowtype. Convert if needed or just copy, using a dedicated
3376 * slot to store the tuple in any case.
3377 */
3378 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3379 if (remoteslot_part == NULL)
3380 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3381 map = ExecGetRootToChildMap(partrelinfo, estate);
3382 if (map != NULL)
3383 {
3384 attrmap = map->attrMap;
3385 remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3386 remoteslot_part);
3387 }
3388 else
3389 {
3390 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3391 slot_getallattrs(remoteslot_part);
3392 }
3393 MemoryContextSwitchTo(oldctx);
3394
3395 /* Check if we can do the update or delete on the leaf partition. */
3396 if (operation == CMD_UPDATE || operation == CMD_DELETE)
3397 {
3398 part_entry = logicalrep_partition_open(relmapentry, partrel,
3399 attrmap);
3400 check_relation_updatable(part_entry);
3401 }
3402
3403 switch (operation)
3404 {
3405 case CMD_INSERT:
3406 apply_handle_insert_internal(edata, partrelinfo,
3407 remoteslot_part);
3408 break;
3409
3410 case CMD_DELETE:
3411 apply_handle_delete_internal(edata, partrelinfo,
3412 remoteslot_part,
3413 part_entry->localindexoid);
3414 break;
3415
3416 case CMD_UPDATE:
3417
3418 /*
3419 * For UPDATE, depending on whether or not the updated tuple
3420 * satisfies the partition's constraint, perform a simple UPDATE
3421 * of the partition or move the updated tuple into a different
3422 * suitable partition.
3423 */
3424 {
3425 TupleTableSlot *localslot;
3426 ResultRelInfo *partrelinfo_new;
3427 Relation partrel_new;
3428 bool found;
3429 EPQState epqstate;
3430 ConflictTupleInfo conflicttuple = {0};
3431
3432 /* Get the matching local tuple from the partition. */
3433 found = FindReplTupleInLocalRel(edata, partrel,
3434 &part_entry->remoterel,
3435 part_entry->localindexoid,
3436 remoteslot_part, &localslot);
3437 if (!found)
3438 {
3440 TupleTableSlot *newslot = localslot;
3441
3442 /*
3443 * Detecting whether the tuple was recently deleted or
3444 * never existed is crucial to avoid misleading the user
3445 * during conflict handling.
3446 */
3447 if (FindDeletedTupleInLocalRel(partrel,
3448 part_entry->localindexoid,
3449 remoteslot_part,
3450 &conflicttuple.xmin,
3451 &conflicttuple.origin,
3452 &conflicttuple.ts) &&
3453 conflicttuple.origin != replorigin_session_origin)
3455 else
3457
3458 /* Store the new tuple for conflict reporting */
3459 slot_store_data(newslot, part_entry, newtup);
3460
3461 /*
3462 * The tuple to be updated could not be found or was
3463 * deleted. Do nothing except for emitting a log message.
3464 */
3465 ReportApplyConflict(estate, partrelinfo, LOG,
3466 type, remoteslot_part, newslot,
3467 list_make1(&conflicttuple));
3468
3469 return;
3470 }
3471
3472 /*
3473 * Report the conflict if the tuple was modified by a
3474 * different origin.
3475 */
3476 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3477 &conflicttuple.origin,
3478 &conflicttuple.ts) &&
3479 conflicttuple.origin != replorigin_session_origin)
3480 {
3481 TupleTableSlot *newslot;
3482
3483 /* Store the new tuple for conflict reporting */
3484 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3485 slot_store_data(newslot, part_entry, newtup);
3486
3487 conflicttuple.slot = localslot;
3488
3489 ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3490 remoteslot_part, newslot,
3491 list_make1(&conflicttuple));
3492 }
3493
3494 /*
3495 * Apply the update to the local tuple, putting the result in
3496 * remoteslot_part.
3497 */
3499 slot_modify_data(remoteslot_part, localslot, part_entry,
3500 newtup);
3501 MemoryContextSwitchTo(oldctx);
3502
3503 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3504
3505 /*
3506 * Does the updated tuple still satisfy the current
3507 * partition's constraint?
3508 */
3509 if (!partrel->rd_rel->relispartition ||
3510 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3511 false))
3512 {
3513 /*
3514 * Yes, so simply UPDATE the partition. We don't call
3515 * apply_handle_update_internal() here, which would
3516 * normally do the following work, to avoid repeating some
3517 * work already done above to find the local tuple in the
3518 * partition.
3519 */
3520 InitConflictIndexes(partrelinfo);
3521
3522 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3524 ACL_UPDATE);
3525 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3526 localslot, remoteslot_part);
3527 }
3528 else
3529 {
3530 /* Move the tuple into the new partition. */
3531
3532 /*
3533 * New partition will be found using tuple routing, which
3534 * can only occur via the parent table. We might need to
3535 * convert the tuple to the parent's rowtype. Note that
3536 * this is the tuple found in the partition, not the
3537 * original search tuple received by this function.
3538 */
3539 if (map)
3540 {
3541 TupleConversionMap *PartitionToRootMap =
3543 RelationGetDescr(parentrel));
3544
3545 remoteslot =
3546 execute_attr_map_slot(PartitionToRootMap->attrMap,
3547 remoteslot_part, remoteslot);
3548 }
3549 else
3550 {
3551 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3552 slot_getallattrs(remoteslot);
3553 }
3554
3555 /* Find the new partition. */
3557 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3558 proute, remoteslot,
3559 estate);
3560 MemoryContextSwitchTo(oldctx);
3561 Assert(partrelinfo_new != partrelinfo);
3562 partrel_new = partrelinfo_new->ri_RelationDesc;
3563
3564 /* Check that new partition also has supported relkind. */
3565 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3567 RelationGetRelationName(partrel_new));
3568
3569 /* DELETE old tuple found in the old partition. */
3570 EvalPlanQualSetSlot(&epqstate, localslot);
3572 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3573
3574 /* INSERT new tuple into the new partition. */
3575
3576 /*
3577 * Convert the replacement tuple to match the destination
3578 * partition rowtype.
3579 */
3581 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3582 if (remoteslot_part == NULL)
3583 remoteslot_part = table_slot_create(partrel_new,
3584 &estate->es_tupleTable);
3585 map = ExecGetRootToChildMap(partrelinfo_new, estate);
3586 if (map != NULL)
3587 {
3588 remoteslot_part = execute_attr_map_slot(map->attrMap,
3589 remoteslot,
3590 remoteslot_part);
3591 }
3592 else
3593 {
3594 remoteslot_part = ExecCopySlot(remoteslot_part,
3595 remoteslot);
3596 slot_getallattrs(remoteslot);
3597 }
3598 MemoryContextSwitchTo(oldctx);
3599 apply_handle_insert_internal(edata, partrelinfo_new,
3600 remoteslot_part);
3601 }
3602
3603 EvalPlanQualEnd(&epqstate);
3604 }
3605 break;
3606
3607 default:
3608 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3609 break;
3610 }
3611}
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:1112
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
Definition: worker.c:3244
ConflictType
Definition: conflict.h:32
@ CT_UPDATE_DELETED
Definition: conflict.h:43
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:37
@ CT_UPDATE_MISSING
Definition: conflict.h:46
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1846
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1326
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
@ CMD_UPDATE
Definition: nodes.h:276
#define makeNode(_type_)
Definition: nodes.h:161
#define ACL_UPDATE
Definition: parsenodes.h:78
#define RelationGetRelationName(relation)
Definition: rel.h:548
#define RelationGetNamespace(relation)
Definition: rel.h:555
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:633
PartitionTupleRouting * proute
Definition: worker.c:319
ModifyTableState * mtstate
Definition: worker.c:318
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:712
CmdType operation
Definition: execnodes.h:1398
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1402
PlanState ps
Definition: execnodes.h:1397
Plan * plan
Definition: execnodes.h:1159
EState * state
Definition: execnodes.h:1161
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:619
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:193
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:371
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:524
const char * type

References ACL_DELETE, ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationDelete(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LogicalRepRelMapEntry::localindexoid, LOG, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, ConflictTupleInfo::origin, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, slot_getallattrs(), slot_modify_data(), slot_store_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ApplyExecutionData::targetRelInfo, ConflictTupleInfo::ts, type, and ConflictTupleInfo::xmin.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2561 of file worker.c.

2562{
2563 LogicalRepTyp typ;
2564
2566 return;
2567
2568 logicalrep_read_typ(s, &typ);
2569}
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:754

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2765 of file worker.c.

2766{
2768 LogicalRepRelId relid;
2769 UserContext ucxt;
2770 ApplyExecutionData *edata;
2771 EState *estate;
2772 LogicalRepTupleData oldtup;
2773 LogicalRepTupleData newtup;
2774 bool has_oldtup;
2775 TupleTableSlot *remoteslot;
2776 RTEPermissionInfo *target_perminfo;
2777 MemoryContext oldctx;
2778 bool run_as_owner;
2779
2780 /*
2781 * Quick return if we are skipping data modification changes or handling
2782 * streamed transactions.
2783 */
2784 if (is_skipping_changes() ||
2786 return;
2787
2789
2790 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2791 &newtup);
2794 {
2795 /*
2796 * The relation can't become interesting in the middle of the
2797 * transaction so it's safe to unlock it.
2798 */
2801 return;
2802 }
2803
2804 /* Set relation for error callback */
2806
2807 /* Check if we can do the update. */
2809
2810 /*
2811 * Make sure that any user-supplied code runs as the table owner, unless
2812 * the user has opted out of that behavior.
2813 */
2814 run_as_owner = MySubscription->runasowner;
2815 if (!run_as_owner)
2816 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2817
2818 /* Initialize the executor state. */
2819 edata = create_edata_for_relation(rel);
2820 estate = edata->estate;
2821 remoteslot = ExecInitExtraTupleSlot(estate,
2823 &TTSOpsVirtual);
2824
2825 /*
2826 * Populate updatedCols so that per-column triggers can fire, and so
2827 * executor can correctly pass down indexUnchanged hint. This could
2828 * include more columns than were actually changed on the publisher
2829 * because the logical replication protocol doesn't contain that
2830 * information. But it would for example exclude columns that only exist
2831 * on the subscriber, since we are not touching those.
2832 */
2833 target_perminfo = list_nth(estate->es_rteperminfos, 0);
2834 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2835 {
2837 int remoteattnum = rel->attrmap->attnums[i];
2838
2839 if (!att->attisdropped && remoteattnum >= 0)
2840 {
2841 Assert(remoteattnum < newtup.ncols);
2842 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2843 target_perminfo->updatedCols =
2844 bms_add_member(target_perminfo->updatedCols,
2846 }
2847 }
2848
2849 /* Build the search tuple. */
2851 slot_store_data(remoteslot, rel,
2852 has_oldtup ? &oldtup : &newtup);
2853 MemoryContextSwitchTo(oldctx);
2854
2855 /* For a partitioned table, apply update to correct partition. */
2856 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2858 remoteslot, &newtup, CMD_UPDATE);
2859 else
2861 remoteslot, &newtup, rel->localindexoid);
2862
2863 finish_edata(edata);
2864
2865 /* Reset relation for error callback */
2867
2868 if (!run_as_owner)
2869 RestoreUserContext(&ucxt);
2870
2872
2874}
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2882
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
int i
Definition: isn.c:77
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:487
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:668
Bitmapset * updatedCols
Definition: parsenodes.h:1326
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:122
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr(), and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2882 of file worker.c.

2887{
2888 EState *estate = edata->estate;
2889 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2890 Relation localrel = relinfo->ri_RelationDesc;
2891 EPQState epqstate;
2892 TupleTableSlot *localslot = NULL;
2893 ConflictTupleInfo conflicttuple = {0};
2894 bool found;
2895 MemoryContext oldctx;
2896
2897 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2898 ExecOpenIndices(relinfo, false);
2899
2900 found = FindReplTupleInLocalRel(edata, localrel,
2901 &relmapentry->remoterel,
2902 localindexoid,
2903 remoteslot, &localslot);
2904
2905 /*
2906 * Tuple found.
2907 *
2908 * Note this will fail if there are other conflicting unique indexes.
2909 */
2910 if (found)
2911 {
2912 /*
2913 * Report the conflict if the tuple was modified by a different
2914 * origin.
2915 */
2916 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2917 &conflicttuple.origin, &conflicttuple.ts) &&
2918 conflicttuple.origin != replorigin_session_origin)
2919 {
2920 TupleTableSlot *newslot;
2921
2922 /* Store the new tuple for conflict reporting */
2923 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2924 slot_store_data(newslot, relmapentry, newtup);
2925
2926 conflicttuple.slot = localslot;
2927
2929 remoteslot, newslot,
2930 list_make1(&conflicttuple));
2931 }
2932
2933 /* Process and store remote tuple in the slot */
2935 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2936 MemoryContextSwitchTo(oldctx);
2937
2938 EvalPlanQualSetSlot(&epqstate, remoteslot);
2939
2940 InitConflictIndexes(relinfo);
2941
2942 /* Do the actual update. */
2944 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2945 remoteslot);
2946 }
2947 else
2948 {
2950 TupleTableSlot *newslot = localslot;
2951
2952 /*
2953 * Detecting whether the tuple was recently deleted or never existed
2954 * is crucial to avoid misleading the user during conflict handling.
2955 */
2956 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2957 &conflicttuple.xmin,
2958 &conflicttuple.origin,
2959 &conflicttuple.ts) &&
2960 conflicttuple.origin != replorigin_session_origin)
2962 else
2964
2965 /* Store the new tuple for conflict reporting */
2966 slot_store_data(newslot, relmapentry, newtup);
2967
2968 /*
2969 * The tuple to be updated could not be found or was deleted. Do
2970 * nothing except for emitting a log message.
2971 */
2972 ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2973 list_make1(&conflicttuple));
2974 }
2975
2976 /* Cleanup. */
2977 ExecCloseIndices(relinfo);
2978 EvalPlanQualEnd(&epqstate);
2979}

References ACL_UPDATE, CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LOG, MemoryContextSwitchTo(), NIL, ConflictTupleInfo::origin, LogicalRepRelMapEntry::remoterel, replorigin_session_origin, ReportApplyConflict(), ResultRelInfo::ri_RelationDesc, ConflictTupleInfo::slot, slot_modify_data(), slot_store_data(), table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, ConflictTupleInfo::ts, type, and ConflictTupleInfo::xmin.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2238 of file worker.c.

2240{
2241 int nchanges;
2242 char path[MAXPGPATH];
2243 char *buffer = NULL;
2244 MemoryContext oldcxt;
2245 ResourceOwner oldowner;
2246 int fileno;
2247 off_t offset;
2248
2251
2252 /* Make sure we have an open transaction */
2254
2255 /*
2256 * Allocate file handle and memory required to process all the messages in
2257 * TopTransactionContext to avoid them getting reset after each message is
2258 * processed.
2259 */
2261
2262 /* Open the spool file for the committed/prepared transaction */
2264 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2265
2266 /*
2267 * Make sure the file is owned by the toplevel transaction so that the
2268 * file will not be accidentally closed when aborting a subtransaction.
2269 */
2270 oldowner = CurrentResourceOwner;
2272
2273 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2274
2275 CurrentResourceOwner = oldowner;
2276
2277 buffer = palloc(BLCKSZ);
2278
2279 MemoryContextSwitchTo(oldcxt);
2280
2281 remote_final_lsn = lsn;
2282
2283 /*
2284 * Make sure the handle apply_dispatch methods are aware we're in a remote
2285 * transaction.
2286 */
2287 in_remote_transaction = true;
2289
2291
2292 /*
2293 * Read the entries one by one and pass them through the same logic as in
2294 * apply_dispatch.
2295 */
2296 nchanges = 0;
2297 while (true)
2298 {
2300 size_t nbytes;
2301 int len;
2302
2304
2305 /* read length of the on-disk record */
2306 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2307
2308 /* have we reached end of the file? */
2309 if (nbytes == 0)
2310 break;
2311
2312 /* do we have a correct length? */
2313 if (len <= 0)
2314 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2315 len, path);
2316
2317 /* make sure we have sufficiently large buffer */
2318 buffer = repalloc(buffer, len);
2319
2320 /* and finally read the data into the buffer */
2321 BufFileReadExact(stream_fd, buffer, len);
2322
2323 BufFileTell(stream_fd, &fileno, &offset);
2324
2325 /* init a stringinfo using the buffer and call apply_dispatch */
2326 initReadOnlyStringInfo(&s2, buffer, len);
2327
2328 /* Ensure we are reading the data into our memory context. */
2330
2332
2334
2335 MemoryContextSwitchTo(oldcxt);
2336
2337 nchanges++;
2338
2339 /*
2340 * It is possible the file has been closed because we have processed
2341 * the transaction end message like stream_commit in which case that
2342 * must be the last message.
2343 */
2344 if (!stream_fd)
2345 {
2346 ensure_last_message(stream_fileset, xid, fileno, offset);
2347 break;
2348 }
2349
2350 if (nchanges % 1000 == 0)
2351 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2352 nchanges, path);
2353 }
2354
2355 if (stream_fd)
2357
2358 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2359 nchanges, path);
2360
2361 return;
2362}
MemoryContext ApplyMessageContext
Definition: worker.c:471
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5336
void apply_dispatch(StringInfo s)
Definition: worker.c:3747
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:2206
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopTransactionContext
Definition: mcxt.c:171
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1610
void * palloc(Size size)
Definition: mcxt.c:1365
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:175
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 4973 of file worker.c.

4974{
4976 {
4977 /*
4978 * Don't stop the parallel apply worker as the leader will detect the
4979 * subscription parameter change and restart logical replication later
4980 * anyway. This also prevents the leader from reporting errors when
4981 * trying to communicate with a stopped parallel apply worker, which
4982 * would accidentally disable subscriptions if disable_on_error was
4983 * set.
4984 */
4985 return;
4986 }
4987
4988 /*
4989 * Reset the last-start time for this apply worker so that the launcher
4990 * will restart it without waiting for wal_retrieve_retry_interval if the
4991 * subscription is still active, and so that we won't leak that hash table
4992 * entry if it isn't.
4993 */
4996
4997 proc_exit(0);
4998}
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1101
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), maybe_reread_subscription(), and resume_conflict_info_retention().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 5885 of file worker.c.

5886{
5887 int worker_slot = DatumGetInt32(main_arg);
5888
5890
5891 SetupApplyOrSyncWorker(worker_slot);
5892
5894
5896
5897 proc_exit(0);
5898}
bool InitializingApplyWorker
Definition: worker.c:499
static void run_apply_worker()
Definition: worker.c:5592
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5831
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 6228 of file worker.c.

6229{
6230 if (isCommit && on_commit_wakeup_workers_subids != NIL)
6231 {
6232 ListCell *lc;
6233
6234 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6236 {
6237 Oid subid = lfirst_oid(lc);
6238 List *workers;
6239 ListCell *lc2;
6240
6241 workers = logicalrep_workers_find(subid, true, false);
6242 foreach(lc2, workers)
6243 {
6244 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6245
6247 }
6248 }
6249 LWLockRelease(LogicalRepWorkerLock);
6250 }
6251
6252 /* The List storage will be reclaimed automatically in xact cleanup. */
6254}
static List * on_commit_wakeup_workers_subids
Definition: worker.c:482
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:286
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:720
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ can_advance_nonremovable_xid()

static bool can_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4370 of file worker.c.

4371{
4372 /*
4373 * It is sufficient to manage non-removable transaction ID for a
4374 * subscription by the main apply worker to detect update_deleted reliably
4375 * even for table sync or parallel apply workers.
4376 */
4378 return false;
4379
4380 /* No need to advance if retaining dead tuples is not required */
4382 return false;
4383
4384 return true;
4385}

References am_leader_apply_worker(), MySubscription, and Subscription::retaindeadtuples.

Referenced by maybe_advance_nonremovable_xid().

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5336 of file worker.c.

5337{
5338 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5339}
#define snprintf
Definition: port.h:239

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2724 of file worker.c.

2725{
2726 /*
2727 * For partitioned tables, we only need to care if the target partition is
2728 * updatable (aka has PK or RI defined for it).
2729 */
2730 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2731 return;
2732
2733 /* Updatable, no error. */
2734 if (rel->updatable)
2735 return;
2736
2737 /*
2738 * We are in error mode so it's fine this is somewhat slow. It's better to
2739 * give user correct error.
2740 */
2742 {
2743 ereport(ERROR,
2744 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2745 errmsg("publisher did not send replica identity column "
2746 "expected by the logical replication target relation \"%s.%s\"",
2747 rel->remoterel.nspname, rel->remoterel.relname)));
2748 }
2749
2750 ereport(ERROR,
2751 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2752 errmsg("logical replication target relation \"%s.%s\" has "
2753 "neither REPLICA IDENTITY index nor PRIMARY "
2754 "KEY and published relation does not have "
2755 "REPLICA IDENTITY FULL",
2756 rel->remoterel.nspname, rel->remoterel.relname)));
2757}
#define OidIsValid(objectId)
Definition: c.h:774
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:891

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 5533 of file worker.c.

5534{
5537
5538 subxact_data.subxacts = NULL;
5542}
static ApplySubXactData subxact_data
Definition: worker.c:545
void pfree(void *pointer)
Definition: mcxt.c:1594
uint32 nsubxacts
Definition: worker.c:539
uint32 nsubxacts_max
Definition: worker.c:540
SubXactInfo * subxacts
Definition: worker.c:542
TransactionId subxact_last
Definition: worker.c:541

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 6029 of file worker.c.

6030{
6031 Relation rel;
6032 Form_pg_subscription subform;
6033 HeapTuple tup;
6034 XLogRecPtr myskiplsn = MySubscription->skiplsn;
6035 bool started_tx = false;
6036
6038 return;
6039
6040 if (!IsTransactionState())
6041 {
6043 started_tx = true;
6044 }
6045
6046 /*
6047 * Updating pg_subscription might involve TOAST table access, so ensure we
6048 * have a valid snapshot.
6049 */
6051
6052 /*
6053 * Protect subskiplsn of pg_subscription from being concurrently updated
6054 * while clearing it.
6055 */
6056 LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6058
6059 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6060
6061 /* Fetch the existing tuple. */
6062 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6064
6065 if (!HeapTupleIsValid(tup))
6066 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6067
6068 subform = (Form_pg_subscription) GETSTRUCT(tup);
6069
6070 /*
6071 * Clear the subskiplsn. If the user has already changed subskiplsn before
6072 * clearing it we don't update the catalog and the replication origin
6073 * state won't get advanced. So in the worst case, if the server crashes
6074 * before sending an acknowledgment of the flush position the transaction
6075 * will be sent again and the user needs to set subskiplsn again. We can
6076 * reduce the possibility by logging a replication origin WAL record to
6077 * advance the origin LSN instead but there is no way to advance the
6078 * origin timestamp and it doesn't seem to be worth doing anything about
6079 * it since it's a very rare case.
6080 */
6081 if (subform->subskiplsn == myskiplsn)
6082 {
6083 bool nulls[Natts_pg_subscription];
6084 bool replaces[Natts_pg_subscription];
6085 Datum values[Natts_pg_subscription];
6086
6087 memset(values, 0, sizeof(values));
6088 memset(nulls, false, sizeof(nulls));
6089 memset(replaces, false, sizeof(replaces));
6090
6091 /* reset subskiplsn */
6092 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6093 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6094
6095 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6096 replaces);
6097 CatalogTupleUpdate(rel, &tup->t_self, tup);
6098
6099 if (myskiplsn != finish_lsn)
6101 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6102 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6103 LSN_FORMAT_ARGS(finish_lsn),
6104 LSN_FORMAT_ARGS(myskiplsn)));
6105 }
6106
6107 heap_freetuple(tup);
6108 table_close(rel, NoLock);
6109
6111
6112 if (started_tx)
6114}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define likely(x)
Definition: c.h:401
int errdetail(const char *fmt,...)
Definition: elog.c:1207
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
void PopActiveSnapshot(void)
Definition: snapmgr.c:773
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT(), GetTransactionSnapshot(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData * create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 865 of file worker.c.

866{
867 ApplyExecutionData *edata;
868 EState *estate;
869 RangeTblEntry *rte;
870 List *perminfos = NIL;
871 ResultRelInfo *resultRelInfo;
872
873 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
874 edata->targetRel = rel;
875
876 edata->estate = estate = CreateExecutorState();
877
878 rte = makeNode(RangeTblEntry);
879 rte->rtekind = RTE_RELATION;
880 rte->relid = RelationGetRelid(rel->localrel);
881 rte->relkind = rel->localrel->rd_rel->relkind;
882 rte->rellockmode = AccessShareLock;
883
884 addRTEPermissionInfo(&perminfos, rte);
885
886 ExecInitRangeTable(estate, list_make1(rte), perminfos,
888
889 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
890
891 /*
892 * Use Relation opened by logicalrep_rel_open() instead of opening it
893 * again.
894 */
895 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
896
897 /*
898 * We put the ResultRelInfo in the es_opened_result_relations list, even
899 * though we don't populate the es_result_relations array. That's a bit
900 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
901 *
902 * ExecOpenIndices() is not called here either, each execution path doing
903 * an apply operation being responsible for that.
904 */
906 lappend(estate->es_opened_result_relations, resultRelInfo);
907
908 estate->es_output_cid = GetCurrentCommandId(true);
909
910 /* Prepare to catch AFTER triggers. */
912
913 /* other fields of edata remain NULL for now */
914
915 return edata;
916}
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1243
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:773
EState * CreateExecutorState(void)
Definition: execUtils.c:88
void * palloc0(Size size)
Definition: mcxt.c:1395
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1043
#define RelationGetRelid(relation)
Definition: rel.h:514
List * es_opened_result_relations
Definition: execnodes.h:688
CommandId es_output_cid
Definition: execnodes.h:682
RTEKind rtekind
Definition: parsenodes.h:1078
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5104
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), bms_make_singleton(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 5905 of file worker.c.

5906{
5907 /*
5908 * Emit the error message, and recover from the error state to an idle
5909 * state
5910 */
5912
5916
5918
5919 /* Report the worker failed during either table synchronization or apply */
5922
5923 /* Disable the subscription */
5925
5926 /*
5927 * Updating pg_subscription might involve TOAST table access, so ensure we
5928 * have a valid snapshot.
5929 */
5931
5935
5936 /* Ensure we remove no-longer-useful entry for worker's start time */
5939
5940 /* Notify the subscription has been disabled and exit */
5941 ereport(LOG,
5942 errmsg("subscription \"%s\" has been disabled because of an error",
5944
5945 /*
5946 * Skip the track_commit_timestamp check when disabling the worker due to
5947 * an error, as verifying commit timestamps is unnecessary in this
5948 * context.
5949 */
5953
5954 proc_exit(0);
5955}
void EmitErrorReport(void)
Definition: elog.c:1695
void FlushErrorState(void)
Definition: elog.c:1875
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4874

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CheckSubDeadTupleRetention(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), PopActiveSnapshot(), proc_exit(), PushActiveSnapshot(), RESUME_INTERRUPTS, Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), LogicalRepWorker::subid, and WARNING.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 2206 of file worker.c.

2208{
2209 char path[MAXPGPATH];
2210 BufFile *fd;
2211 int last_fileno;
2212 off_t last_offset;
2213
2215
2217
2219
2220 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2221
2222 BufFileSeek(fd, 0, 0, SEEK_END);
2223 BufFileTell(fd, &last_fileno, &last_offset);
2224
2226
2228
2229 if (last_fileno != fileno || last_offset != offset)
2230 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2231 path);
2232}
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:740
void BufFileClose(BufFile *file)
Definition: buffile.c:412
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindDeletedTupleInLocalRel()

static bool FindDeletedTupleInLocalRel ( Relation  localrel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TransactionId delete_xid,
RepOriginId delete_origin,
TimestampTz delete_time 
)
static

Definition at line 3244 of file worker.c.

3248{
3249 TransactionId oldestxmin;
3250
3251 /*
3252 * Return false if either dead tuples are not retained or commit timestamp
3253 * data is not available.
3254 */
3256 return false;
3257
3258 /*
3259 * For conflict detection, we use the leader worker's
3260 * oldest_nonremovable_xid value instead of invoking
3261 * GetOldestNonRemovableTransactionId() or using the conflict detection
3262 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3263 * identify tuples that were recently deleted. These deleted tuples are no
3264 * longer visible to concurrent transactions. However, if a remote update
3265 * matches such a tuple, we log an update_deleted conflict.
3266 *
3267 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3268 * transaction IDs older than oldest_nonremovable_xid, for our current
3269 * purpose, it is acceptable to treat tuples deleted by transactions prior
3270 * to oldest_nonremovable_xid as update_missing conflicts.
3271 */
3273 {
3275 }
3276 else
3277 {
3278 LogicalRepWorker *leader;
3279
3280 /*
3281 * Obtain the information from the leader apply worker as only the
3282 * leader manages oldest_nonremovable_xid (see
3283 * maybe_advance_nonremovable_xid() for details).
3284 */
3285 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3287 InvalidOid, false);
3288 if (!leader)
3289 {
3290 ereport(ERROR,
3291 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3292 errmsg("could not detect conflict as the leader apply worker has exited")));
3293 }
3294
3295 SpinLockAcquire(&leader->relmutex);
3296 oldestxmin = leader->oldest_nonremovable_xid;
3297 SpinLockRelease(&leader->relmutex);
3298 LWLockRelease(LogicalRepWorkerLock);
3299 }
3300
3301 /*
3302 * Return false if the leader apply worker has stopped retaining
3303 * information for detecting conflicts. This implies that update_deleted
3304 * can no longer be reliably detected.
3305 */
3306 if (!TransactionIdIsValid(oldestxmin))
3307 return false;
3308
3309 if (OidIsValid(localidxoid) &&
3310 IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3311 return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3312 remoteslot, oldestxmin,
3313 delete_xid, delete_origin,
3314 delete_time);
3315 else
3316 return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3317 oldestxmin, delete_xid,
3318 delete_origin, delete_time);
3319}
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition: worker.c:3210
bool track_commit_timestamp
Definition: commit_ts.c:109
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:254
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TransactionId oldest_nonremovable_xid

References am_leader_apply_worker(), ereport, errcode(), errmsg(), ERROR, InvalidOid, IsIndexUsableForFindingDeletedTuple(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, MySubscription, OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, RelationFindDeletedTupleInfoByIndex(), RelationFindDeletedTupleInfoSeq(), LogicalRepWorker::relmutex, Subscription::retaindeadtuples, SpinLockAcquire, SpinLockRelease, LogicalRepWorker::subid, track_commit_timestamp, and TransactionIdIsValid.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 3149 of file worker.c.

3154{
3155 EState *estate = edata->estate;
3156 bool found;
3157
3158 /*
3159 * Regardless of the top-level operation, we're performing a read here, so
3160 * check for SELECT privileges.
3161 */
3163
3164 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3165
3166 Assert(OidIsValid(localidxoid) ||
3167 (remoterel->replident == REPLICA_IDENTITY_FULL));
3168
3169 if (OidIsValid(localidxoid))
3170 {
3171#ifdef USE_ASSERT_CHECKING
3172 Relation idxrel = index_open(localidxoid, AccessShareLock);
3173
3174 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3175 Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3176 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3178 edata->targetRel->attrmap)));
3180#endif
3181
3182 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3184 remoteslot, *localslot);
3185 }
3186 else
3188 remoteslot, *localslot);
3189
3190 return found;
3191}
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition: relation.c:821

References AccessShareLock, ACL_SELECT, Assert(), LogicalRepRelMapEntry::attrmap, EState::es_tupleTable, ApplyExecutionData::estate, GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 923 of file worker.c.

924{
925 EState *estate = edata->estate;
926
927 /* Handle any queued AFTER triggers. */
928 AfterTriggerEndQuery(estate);
929
930 /* Shut down tuple routing, if any was done. */
931 if (edata->proute)
932 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
933
934 /*
935 * Cleanup. It might seem that we should call ExecCloseResultRelations()
936 * here, but we intentionally don't. It would close the rel we added to
937 * es_opened_result_relations above, which is wrong because we took no
938 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
939 * any other relations opened during execution.
940 */
941 ExecResetTupleTable(estate->es_tupleTable, false);
942 FreeExecutorState(estate);
943 pfree(edata);
944}
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1380
void FreeExecutorState(EState *estate)
Definition: execUtils.c:192
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5124

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_candidate_xid()

static void get_candidate_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4422 of file worker.c.

4423{
4424 TransactionId oldest_running_xid;
4426
4427 /*
4428 * Use last_recv_time when applying changes in the loop to avoid
4429 * unnecessary system time retrieval. If last_recv_time is not available,
4430 * obtain the current timestamp.
4431 */
4432 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4433
4434 /*
4435 * Compute the candidate_xid and request the publisher status at most once
4436 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4437 * details on how this value is dynamically adjusted. This is to avoid
4438 * using CPU and network resources without making much progress.
4439 */
4441 rdt_data->xid_advance_interval))
4442 return;
4443
4444 /*
4445 * Immediately update the timer, even if the function returns later
4446 * without setting candidate_xid due to inactivity on the subscriber. This
4447 * avoids frequent calls to GetOldestActiveTransactionId.
4448 */
4449 rdt_data->candidate_xid_time = now;
4450
4451 /*
4452 * Consider transactions in the current database, as only dead tuples from
4453 * this database are required for conflict detection.
4454 */
4455 oldest_running_xid = GetOldestActiveTransactionId(false, false);
4456
4457 /*
4458 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4459 * of its previously computed value.
4460 */
4462 oldest_running_xid));
4463
4464 /* Return if the oldest_nonremovable_xid cannot be advanced */
4466 oldest_running_xid))
4467 {
4468 adjust_xid_advance_interval(rdt_data, false);
4469 return;
4470 }
4471
4472 adjust_xid_advance_interval(rdt_data, true);
4473
4474 rdt_data->candidate_xid = oldest_running_xid;
4476
4477 /* process the next phase */
4478 process_rdt_phase_transition(rdt_data, false);
4479}
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition: worker.c:4924
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4392
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
int64 TimestampTz
Definition: timestamp.h:39
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition: procarray.c:2833
TimestampTz last_recv_time
Definition: worker.c:443
TimestampTz candidate_xid_time
Definition: worker.c:444
RetainDeadTuplesPhase phase
Definition: worker.c:403
TransactionId candidate_xid
Definition: worker.c:430
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References adjust_xid_advance_interval(), Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, GetCurrentTimestamp(), GetOldestActiveTransactionId(), RetainDeadTuplesData::last_recv_time, MyLogicalRepWorker, now(), LogicalRepWorker::oldest_nonremovable_xid, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, TimestampDifferenceExceeds(), TransactionIdEquals, TransactionIdPrecedesOrEquals(), and RetainDeadTuplesData::xid_advance_interval.

Referenced by process_rdt_phase_transition().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool *  have_pending_txes 
)
static

Definition at line 3867 of file worker.c.

3869{
3870 dlist_mutable_iter iter;
3871 XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3872
3874 *flush = InvalidXLogRecPtr;
3875
3877 {
3878 FlushPosition *pos =
3879 dlist_container(FlushPosition, node, iter.cur);
3880
3881 *write = pos->remote_end;
3882
3883 if (pos->local_end <= local_flush)
3884 {
3885 *flush = pos->remote_end;
3886 dlist_delete(iter.cur);
3887 pfree(pos);
3888 }
3889 else
3890 {
3891 /*
3892 * Don't want to uselessly iterate over the rest of the list which
3893 * could potentially be long. Instead get the last element and
3894 * grab the write position from there.
3895 */
3897 &lsn_mapping);
3898 *write = pos->remote_end;
3899 *have_pending_txes = true;
3900 return;
3901 }
3902 }
3903
3904 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3905}
static dlist_head lsn_mapping
Definition: worker.c:308
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:305
XLogRecPtr local_end
Definition: worker.c:304
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6551

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback(), and wait_for_local_flush().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 6275 of file worker.c.

6276{
6277 *winfo = NULL;
6278
6280 {
6281 return TRANS_PARALLEL_APPLY;
6282 }
6283
6284 /*
6285 * If we are processing this transaction using a parallel apply worker
6286 * then either we send the changes to the parallel worker or if the worker
6287 * is busy then serialize the changes to the file which will later be
6288 * processed by the parallel worker.
6289 */
6290 *winfo = pa_find_worker(xid);
6291
6292 if (*winfo && (*winfo)->serialize_changes)
6293 {
6295 }
6296 else if (*winfo)
6297 {
6299 }
6300
6301 /*
6302 * If there is no parallel worker involved to process this transaction
6303 * then we either directly apply the change or serialize it to a file
6304 * which will later be applied when the transaction finish message is
6305 * processed.
6306 */
6307 else if (in_streamed_transaction)
6308 {
6310 }
6311 else
6312 {
6313 return TRANS_LEADER_APPLY;
6314 }
6315}
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 772 of file worker.c.

773{
774 TransactionId current_xid;
776 TransApplyAction apply_action;
777 StringInfoData original_msg;
778
779 apply_action = get_transaction_apply_action(stream_xid, &winfo);
780
781 /* not in streaming mode */
782 if (apply_action == TRANS_LEADER_APPLY)
783 return false;
784
786
787 /*
788 * The parallel apply worker needs the xid in this message to decide
789 * whether to define a savepoint, so save the original message that has
790 * not moved the cursor after the xid. We will serialize this message to a
791 * file in PARTIAL_SERIALIZE mode.
792 */
793 original_msg = *s;
794
795 /*
796 * We should have received XID of the subxact as the first part of the
797 * message, so extract it.
798 */
799 current_xid = pq_getmsgint(s, 4);
800
801 if (!TransactionIdIsValid(current_xid))
803 (errcode(ERRCODE_PROTOCOL_VIOLATION),
804 errmsg_internal("invalid transaction ID in streamed replication transaction")));
805
806 switch (apply_action)
807 {
810
811 /* Add the new subxact to the array (unless already there). */
812 subxact_info_add(current_xid);
813
814 /* Write the change to the current file */
816 return true;
817
819 Assert(winfo);
820
821 /*
822 * XXX The publisher side doesn't always send relation/type update
823 * messages after the streaming transaction, so also update the
824 * relation/type in leader apply worker. See function
825 * cleanup_rel_sync_cache.
826 */
827 if (pa_send_data(winfo, s->len, s->data))
828 return (action != LOGICAL_REP_MSG_RELATION &&
830
831 /*
832 * Switch to serialize mode when we are not able to send the
833 * change to parallel apply worker.
834 */
835 pa_switch_to_partial_serialize(winfo, false);
836
837 /* fall through */
839 stream_write_change(action, &original_msg);
840
841 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
842 return (action != LOGICAL_REP_MSG_RELATION &&
844
847
848 /* Define a savepoint for a subxact if needed. */
849 pa_start_subtrans(current_xid, stream_xid);
850 return false;
851
852 default:
853 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
854 return false; /* silence compiler warning */
855 }
856}
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:5251
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415

References generate_unaccent_rules::action, Assert(), StringInfoData::data, elog, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 5705 of file worker.c.

5706{
5707 MemoryContext oldctx;
5708
5709 /* Run as replica session replication role. */
5710 SetConfigOption("session_replication_role", "replica",
5712
5713 /* Connect to our database. */
5716 0);
5717
5718 /*
5719 * Set always-secure search path, so malicious users can't redirect user
5720 * code (e.g. pg_index.indexprs).
5721 */
5722 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5723
5724 /* Load the subscription into persistent memory context. */
5726 "ApplyContext",
5730
5731 /*
5732 * Lock the subscription to prevent it from being concurrently dropped,
5733 * then re-verify its existence. After the initialization, the worker will
5734 * be terminated gracefully if the subscription is dropped.
5735 */
5736 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5739 if (!MySubscription)
5740 {
5741 ereport(LOG,
5742 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5744
5745 /* Ensure we remove no-longer-useful entry for worker's start time */
5748
5749 proc_exit(0);
5750 }
5751
5752 MySubscriptionValid = true;
5753 MemoryContextSwitchTo(oldctx);
5754
5755 if (!MySubscription->enabled)
5756 {
5757 ereport(LOG,
5758 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5759 MySubscription->name)));
5760
5762 }
5763
5764 /*
5765 * Restart the worker if retain_dead_tuples was enabled during startup.
5766 *
5767 * At this point, the replication slot used for conflict detection might
5768 * not exist yet, or could be dropped soon if the launcher perceives
5769 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5770 * oldest_nonremovable_xid when the slot is absent or at risk of being
5771 * dropped, a restart is initiated.
5772 *
5773 * The oldest_nonremovable_xid should be initialized only when the
5774 * subscription's retention is active before launching the worker. See
5775 * logicalrep_worker_launch.
5776 */
5777 if (am_leader_apply_worker() &&
5781 {
5782 ereport(LOG,
5783 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5784 MySubscription->name, "retain_dead_tuples"));
5785
5787 }
5788
5789 /* Setup synchronous commit according to the user's wishes */
5790 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5792
5793 /*
5794 * Keep us informed about subscription or role changes. Note that the
5795 * role's superuser privilege can be revoked.
5796 */
5797 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5799 (Datum) 0);
5800
5803 (Datum) 0);
5804
5805 if (am_tablesync_worker())
5806 ereport(LOG,
5807 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5810 else
5811 ereport(LOG,
5812 (errmsg("logical replication apply worker for subscription \"%s\" has started",
5813 MySubscription->name)));
5814
5816}
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:5136
static void apply_worker_exit(void)
Definition: worker.c:4973
MemoryContext ApplyContext
Definition: worker.c:472
static bool MySubscriptionValid
Definition: worker.c:480
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:887
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4338
@ PGC_S_OVERRIDE
Definition: guc.h:123
@ PGC_SUSET
Definition: guc.h:78
@ PGC_BACKEND
Definition: guc.h:77
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
MemoryContext TopMemoryContext
Definition: mcxt.c:166
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok)

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LockSharedObject(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, Subscription::retaindeadtuples, Subscription::retentionactive, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, TransactionIdIsValid, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsIndexUsableForFindingDeletedTuple()

static bool IsIndexUsableForFindingDeletedTuple ( Oid  localindexoid,
TransactionId  conflict_detection_xmin 
)
static

Definition at line 3210 of file worker.c.

3212{
3213 HeapTuple index_tuple;
3214 TransactionId index_xmin;
3215
3216 index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3217
3218 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3219 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3220
3221 /*
3222 * No need to check for a frozen transaction ID, as
3223 * TransactionIdPrecedes() manages it internally, treating it as falling
3224 * behind the conflict_detection_xmin.
3225 */
3226 index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3227
3228 ReleaseSysCache(index_tuple);
3229
3230 return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3231}
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
Definition: htup_details.h:324
HeapTupleHeader t_data
Definition: htup.h:68
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References elog, ERROR, HeapTupleHeaderGetXmin(), HeapTupleIsValid, ObjectIdGetDatum(), ReleaseSysCache(), SearchSysCache1(), HeapTupleData::t_data, and TransactionIdPrecedes().

Referenced by FindDeletedTupleInLocalRel().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 5970 of file worker.c.

5971{
5973}
bool IsLogicalWorker(void)
Definition: worker.c:5961

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 5961 of file worker.c.

5962{
5963 return MyLogicalRepWorker != NULL;
5964}

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3953 of file worker.c.

3954{
3955 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3956 bool ping_sent = false;
3957 TimeLineID tli;
3958 ErrorContextCallback errcallback;
3959 RetainDeadTuplesData rdt_data = {0};
3960
3961 /*
3962 * Init the ApplyMessageContext which we clean up after each replication
3963 * protocol message.
3964 */
3966 "ApplyMessageContext",
3968
3969 /*
3970 * This memory context is used for per-stream data when the streaming mode
3971 * is enabled. This context is reset on each stream stop.
3972 */
3974 "LogicalStreamingContext",
3976
3977 /* mark as idle, before starting to loop */
3979
3980 /*
3981 * Push apply error context callback. Fields will be filled while applying
3982 * a change.
3983 */
3984 errcallback.callback = apply_error_callback;
3985 errcallback.previous = error_context_stack;
3986 error_context_stack = &errcallback;
3988
3989 /* This outer loop iterates once per wait. */
3990 for (;;)
3991 {
3993 int rc;
3994 int len;
3995 char *buf = NULL;
3996 bool endofstream = false;
3997 long wait_time;
3998
4000
4002
4004
4005 if (len != 0)
4006 {
4007 /* Loop to process all available data (without blocking). */
4008 for (;;)
4009 {
4011
4012 if (len == 0)
4013 {
4014 break;
4015 }
4016 else if (len < 0)
4017 {
4018 ereport(LOG,
4019 (errmsg("data stream from publisher has ended")));
4020 endofstream = true;
4021 break;
4022 }
4023 else
4024 {
4025 int c;
4027
4029 {
4030 ConfigReloadPending = false;
4032 }
4033
4034 /* Reset timeout. */
4035 last_recv_timestamp = GetCurrentTimestamp();
4036 ping_sent = false;
4037
4038 rdt_data.last_recv_time = last_recv_timestamp;
4039
4040 /* Ensure we are reading the data into our memory context. */
4042
4044
4045 c = pq_getmsgbyte(&s);
4046
4047 if (c == PqReplMsg_WALData)
4048 {
4049 XLogRecPtr start_lsn;
4050 XLogRecPtr end_lsn;
4051 TimestampTz send_time;
4052
4053 start_lsn = pq_getmsgint64(&s);
4054 end_lsn = pq_getmsgint64(&s);
4055 send_time = pq_getmsgint64(&s);
4056
4057 if (last_received < start_lsn)
4058 last_received = start_lsn;
4059
4060 if (last_received < end_lsn)
4061 last_received = end_lsn;
4062
4063 UpdateWorkerStats(last_received, send_time, false);
4064
4065 apply_dispatch(&s);
4066
4067 maybe_advance_nonremovable_xid(&rdt_data, false);
4068 }
4069 else if (c == PqReplMsg_Keepalive)
4070 {
4071 XLogRecPtr end_lsn;
4073 bool reply_requested;
4074
4075 end_lsn = pq_getmsgint64(&s);
4077 reply_requested = pq_getmsgbyte(&s);
4078
4079 if (last_received < end_lsn)
4080 last_received = end_lsn;
4081
4082 send_feedback(last_received, reply_requested, false);
4083
4084 maybe_advance_nonremovable_xid(&rdt_data, false);
4085
4086 UpdateWorkerStats(last_received, timestamp, true);
4087 }
4088 else if (c == PqReplMsg_PrimaryStatusUpdate)
4089 {
4090 rdt_data.remote_lsn = pq_getmsgint64(&s);
4093 rdt_data.reply_time = pq_getmsgint64(&s);
4094
4095 /*
4096 * This should never happen, see
4097 * ProcessStandbyPSRequestMessage. But if it happens
4098 * due to a bug, we don't want to proceed as it can
4099 * incorrectly advance oldest_nonremovable_xid.
4100 */
4101 if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
4102 elog(ERROR, "cannot get the latest WAL position from the publisher");
4103
4104 maybe_advance_nonremovable_xid(&rdt_data, true);
4105
4106 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4107 }
4108 /* other message types are purposefully ignored */
4109
4111 }
4112
4114 }
4115 }
4116
4117 /* confirm all writes so far */
4118 send_feedback(last_received, false, false);
4119
4120 /* Reset the timestamp if no message was received */
4121 rdt_data.last_recv_time = 0;
4122
4123 maybe_advance_nonremovable_xid(&rdt_data, false);
4124
4126 {
4127 /*
4128 * If we didn't get any transactions for a while there might be
4129 * unconsumed invalidation messages in the queue, consume them
4130 * now.
4131 */
4134
4135 /* Process any table synchronization changes. */
4136 process_syncing_tables(last_received);
4137 }
4138
4139 /* Cleanup the memory. */
4142
4143 /* Check if we need to exit the streaming loop. */
4144 if (endofstream)
4145 break;
4146
4147 /*
4148 * Wait for more data or latch. If we have unflushed transactions,
4149 * wake up after WalWriterDelay to see if they've been flushed yet (in
4150 * which case we should send a feedback message). Otherwise, there's
4151 * no particular urgency about waking up unless we get data or a
4152 * signal.
4153 */
4155 wait_time = WalWriterDelay;
4156 else
4157 wait_time = NAPTIME_PER_CYCLE;
4158
4159 /*
4160 * Ensure to wake up when it's possible to advance the non-removable
4161 * transaction ID, or when the retention duration may have exceeded
4162 * max_retention_duration.
4163 */
4165 {
4166 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4167 rdt_data.xid_advance_interval)
4168 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4169 else if (MySubscription->maxretention > 0)
4170 wait_time = Min(wait_time, MySubscription->maxretention);
4171 }
4172
4176 fd, wait_time,
4177 WAIT_EVENT_LOGICAL_APPLY_MAIN);
4178
4179 if (rc & WL_LATCH_SET)
4180 {
4183 }
4184
4186 {
4187 ConfigReloadPending = false;
4189 }
4190
4191 if (rc & WL_TIMEOUT)
4192 {
4193 /*
4194 * We didn't receive anything new. If we haven't heard anything
4195 * from the server for more than wal_receiver_timeout / 2, ping
4196 * the server. Also, if it's been longer than
4197 * wal_receiver_status_interval since the last update we sent,
4198 * send a status update to the primary anyway, to report any
4199 * progress in applying WAL.
4200 */
4201 bool requestReply = false;
4202
4203 /*
4204 * Check if time since last receive from primary has reached the
4205 * configured limit.
4206 */
4207 if (wal_receiver_timeout > 0)
4208 {
4210 TimestampTz timeout;
4211
4212 timeout =
4213 TimestampTzPlusMilliseconds(last_recv_timestamp,
4215
4216 if (now >= timeout)
4217 ereport(ERROR,
4218 (errcode(ERRCODE_CONNECTION_FAILURE),
4219 errmsg("terminating logical replication worker due to timeout")));
4220
4221 /* Check to see if it's time for a ping. */
4222 if (!ping_sent)
4223 {
4224 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4225 (wal_receiver_timeout / 2));
4226 if (now >= timeout)
4227 {
4228 requestReply = true;
4229 ping_sent = true;
4230 }
4231 }
4232 }
4233
4234 send_feedback(last_received, requestReply, requestReply);
4235
4236 maybe_advance_nonremovable_xid(&rdt_data, false);
4237
4238 /*
4239 * Force reporting to ensure long idle periods don't lead to
4240 * arbitrarily delayed stats. Stats can only be reported outside
4241 * of (implicit or explicit) transactions. That shouldn't lead to
4242 * stats being delayed for long, because transactions are either
4243 * sent as a whole on commit or streamed. Streamed transactions
4244 * are spilled to disk and applied on commit.
4245 */
4246 if (!IsTransactionState())
4247 pgstat_report_stat(true);
4248 }
4249 }
4250
4251 /* Pop the error context stack */
4252 error_context_stack = errcallback.previous;
4254
4255 /* All done */
4257}
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3937
#define NAPTIME_PER_CYCLE
Definition: worker.c:299
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:469
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:4266
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4356
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
void apply_error_callback(void *arg)
Definition: worker.c:6118
static MemoryContext LogicalStreamingContext
Definition: worker.c:475
uint64_t uint64
Definition: c.h:539
ErrorContextCallback * error_context_stack
Definition: elog.c:95
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:223
void ResetLatch(Latch *latch)
Definition: latch.c:374
static char * buf
Definition: pg_test_fsync.c:72
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
char * c
#define PqReplMsg_WALData
Definition: protocol.h:77
#define PqReplMsg_Keepalive
Definition: protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition: protocol.h:76
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
FullTransactionId remote_oldestxid
Definition: worker.c:412
FullTransactionId remote_nextxid
Definition: worker.c:419
XLogRecPtr remote_lsn
Definition: worker.c:404
TimestampTz reply_time
Definition: worker.c:421
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition: transam.h:81
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
int wal_receiver_timeout
Definition: walreceiver.c:89
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
int WalWriterDelay
Definition: walwriter.c:70
uint32 TimeLineID
Definition: xlogdefs.h:62

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), elog, ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), FullTransactionIdFromU64(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), RetainDeadTuplesData::last_recv_time, len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, Subscription::maxretention, maybe_advance_nonremovable_xid(), maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), Min, MyLatch, MySubscription, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), RetainDeadTuplesData::phase, pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_PrimaryStatusUpdate, PqReplMsg_WALData, ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), RDT_GET_CANDIDATE_XID, RetainDeadTuplesData::remote_lsn, RetainDeadTuplesData::remote_nextxid, RetainDeadTuplesData::remote_oldestxid, RetainDeadTuplesData::reply_time, ResetLatch(), Subscription::retentionactive, send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, RetainDeadTuplesData::xid_advance_interval, and XLogRecPtrIsInvalid.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_advance_nonremovable_xid()

static void maybe_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4356 of file worker.c.

4358{
4359 if (!can_advance_nonremovable_xid(rdt_data))
4360 return;
4361
4362 process_rdt_phase_transition(rdt_data, status_received);
4363}
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4370

References can_advance_nonremovable_xid(), and process_rdt_phase_transition().

Referenced by LogicalRepApplyLoop().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 5007 of file worker.c.

5008{
5009 MemoryContext oldctx;
5011 bool started_tx = false;
5012
5013 /* When cache state is valid there is nothing to do here. */
5015 return;
5016
5017 /* This function might be called inside or outside of transaction. */
5018 if (!IsTransactionState())
5019 {
5021 started_tx = true;
5022 }
5023
5024 /* Ensure allocations in permanent context. */
5026
5028
5029 /*
5030 * Exit if the subscription was removed. This normally should not happen
5031 * as the worker gets killed during DROP SUBSCRIPTION.
5032 */
5033 if (!newsub)
5034 {
5035 ereport(LOG,
5036 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5037 MySubscription->name)));
5038
5039 /* Ensure we remove no-longer-useful entry for worker's start time */
5042
5043 proc_exit(0);
5044 }
5045
5046 /* Exit if the subscription was disabled. */
5047 if (!newsub->enabled)
5048 {
5049 ereport(LOG,
5050 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5051 MySubscription->name)));
5052
5054 }
5055
5056 /* !slotname should never happen when enabled is true. */
5057 Assert(newsub->slotname);
5058
5059 /* two-phase cannot be altered while the worker is running */
5060 Assert(newsub->twophasestate == MySubscription->twophasestate);
5061
5062 /*
5063 * Exit if any parameter that affects the remote connection was changed.
5064 * The launcher will start a new worker but note that the parallel apply
5065 * worker won't restart if the streaming option's value is changed from
5066 * 'parallel' to any other value or the server decides not to stream the
5067 * in-progress transaction.
5068 */
5069 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5070 strcmp(newsub->name, MySubscription->name) != 0 ||
5071 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5072 newsub->binary != MySubscription->binary ||
5073 newsub->stream != MySubscription->stream ||
5074 newsub->passwordrequired != MySubscription->passwordrequired ||
5075 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5076 newsub->owner != MySubscription->owner ||
5077 !equal(newsub->publications, MySubscription->publications))
5078 {
5080 ereport(LOG,
5081 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5082 MySubscription->name)));
5083 else
5084 ereport(LOG,
5085 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5086 MySubscription->name)));
5087
5089 }
5090
5091 /*
5092 * Exit if the subscription owner's superuser privileges have been
5093 * revoked.
5094 */
5095 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5096 {
5098 ereport(LOG,
5099 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5101 else
5102 ereport(LOG,
5103 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5105
5107 }
5108
5109 /* Check for other changes that should never happen too. */
5110 if (newsub->dbid != MySubscription->dbid)
5111 {
5112 elog(ERROR, "subscription %u changed unexpectedly",
5114 }
5115
5116 /* Clean old subscription info and switch to new one. */
5119
5120 MemoryContextSwitchTo(oldctx);
5121
5122 /* Change synchronous commit according to the user's wishes */
5123 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5125
5126 if (started_tx)
5128
5129 MySubscriptionValid = true;
5130}
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 5980 of file worker.c.

5981{
5985
5986 /*
5987 * Quick return if it's not requested to skip this transaction. This
5988 * function is called for every remote transaction and we assume that
5989 * skipping the transaction is not used often.
5990 */
5992 MySubscription->skiplsn != finish_lsn))
5993 return;
5994
5995 /* Start skipping all changes of this transaction */
5996 skip_xact_finish_lsn = finish_lsn;
5997
5998 ereport(LOG,
5999 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6001}
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:516

References Assert(), ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ process_rdt_phase_transition()

static void process_rdt_phase_transition ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4392 of file worker.c.

4394{
4395 switch (rdt_data->phase)
4396 {
4398 get_candidate_xid(rdt_data);
4399 break;
4401 request_publisher_status(rdt_data);
4402 break;
4404 wait_for_publisher_status(rdt_data, status_received);
4405 break;
4407 wait_for_local_flush(rdt_data);
4408 break;
4411 break;
4414 break;
4415 }
4416}
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4583
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4422
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition: worker.c:4524
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4485
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4811
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4773

References get_candidate_xid(), RetainDeadTuplesData::phase, RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, request_publisher_status(), resume_conflict_info_retention(), stop_conflict_info_retention(), wait_for_local_flush(), and wait_for_publisher_status().

Referenced by get_candidate_xid(), maybe_advance_nonremovable_xid(), wait_for_local_flush(), and wait_for_publisher_status().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 641 of file worker.c.

643{
644 if (OidIsValid(relid))
645 {
646 /* Replication origin name for tablesync workers. */
647 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 }
649 else
650 {
651 /* Replication origin name for non-tablesync workers. */
652 snprintf(originname, szoriginname, "pg_%u", suboid);
653 }
654}

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), run_apply_worker(), and run_tablesync_worker().

◆ replorigin_reset()

static void replorigin_reset ( int  code,
Datum  arg 
)
static

◆ request_publisher_status()

static void request_publisher_status ( RetainDeadTuplesData rdt_data)
static

Definition at line 4485 of file worker.c.

4486{
4487 static StringInfo request_message = NULL;
4488
4489 if (!request_message)
4490 {
4492
4493 request_message = makeStringInfo();
4494 MemoryContextSwitchTo(oldctx);
4495 }
4496 else
4497 resetStringInfo(request_message);
4498
4499 /*
4500 * Send the current time to update the remote walsender's latest reply
4501 * message received time.
4502 */
4504 pq_sendint64(request_message, GetCurrentTimestamp());
4505
4506 elog(DEBUG2, "sending publisher status request message");
4507
4508 /* Send a request for the publisher status */
4510 request_message->data, request_message->len);
4511
4513
4514 /*
4515 * Skip calling maybe_advance_nonremovable_xid() since further transition
4516 * is possible only once we receive the publisher status message.
4517 */
4518}
#define DEBUG2
Definition: elog.h:29
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
#define PqReplMsg_PrimaryStatusRequest
Definition: protocol.h:83
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:457

References ApplyContext, StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, LogRepWorkerWalRcvConn, makeStringInfo(), MemoryContextSwitchTo(), RetainDeadTuplesData::phase, pq_sendbyte(), pq_sendint64(), PqReplMsg_PrimaryStatusRequest, RDT_WAIT_FOR_PUBLISHER_STATUS, resetStringInfo(), and walrcv_send.

Referenced by process_rdt_phase_transition().

◆ reset_apply_error_context_info()

◆ reset_retention_data_fields()

◆ resume_conflict_info_retention()

static void resume_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4811 of file worker.c.

4812{
4813 /* We can't resume retention without updating retention status. */
4814 if (!update_retention_status(true))
4815 return;
4816
4817 ereport(LOG,
4818 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4821 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4822 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4823
4824 /*
4825 * Restart the worker to let the launcher initialize
4826 * oldest_nonremovable_xid at startup.
4827 *
4828 * While it's technically possible to derive this value on-the-fly using
4829 * the conflict detection slot's xmin, doing so risks a race condition:
4830 * the launcher might clean slot.xmin just after retention resumes. This
4831 * would make oldest_nonremovable_xid unreliable, especially during xid
4832 * wraparound.
4833 *
4834 * Although this can be prevented by introducing heavy weight locking, the
4835 * complexity it will bring doesn't seem worthwhile given how rarely
4836 * retention is resumed.
4837 */
4839}
static bool update_retention_status(bool active)
Definition: worker.c:4851

References apply_worker_exit(), ereport, errdetail(), errmsg(), LOG, Subscription::maxretention, MySubscription, Subscription::name, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ run_apply_worker()

static void run_apply_worker ( )
static

Definition at line 5592 of file worker.c.

5593{
5594 char originname[NAMEDATALEN];
5595 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5596 char *slotname = NULL;
5598 RepOriginId originid;
5599 TimeLineID startpointTLI;
5600 char *err;
5601 bool must_use_password;
5602
5603 slotname = MySubscription->slotname;
5604
5605 /*
5606 * This shouldn't happen if the subscription is enabled, but guard against
5607 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5608 * slot is NULL.)
5609 */
5610 if (!slotname)
5611 ereport(ERROR,
5612 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5613 errmsg("subscription has no replication slot set")));
5614
5615 /* Setup replication origin tracking. */
5617 originname, sizeof(originname));
5619 originid = replorigin_by_name(originname, true);
5620 if (!OidIsValid(originid))
5621 originid = replorigin_create(originname);
5622 replorigin_session_setup(originid, 0);
5623 replorigin_session_origin = originid;
5624 origin_startpos = replorigin_session_get_progress(false);
5626
5627 /* Is the use of a password mandatory? */
5628 must_use_password = MySubscription->passwordrequired &&
5630
5632 true, must_use_password,
5634
5635 if (LogRepWorkerWalRcvConn == NULL)
5636 ereport(ERROR,
5637 (errcode(ERRCODE_CONNECTION_FAILURE),
5638 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5639 MySubscription->name, err)));
5640
5641 /*
5642 * We don't really use the output identify_system for anything but it does
5643 * some initializations on the upstream so let's still call it.
5644 */
5645 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5646
5648
5649 set_stream_options(&options, slotname, &origin_startpos);
5650
5651 /*
5652 * Even when the two_phase mode is requested by the user, it remains as
5653 * the tri-state PENDING until all tablesyncs have reached READY state.
5654 * Only then, can it become ENABLED.
5655 *
5656 * Note: If the subscription has no tables then leave the state as
5657 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5658 * work.
5659 */
5660 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5662 {
5663 /* Start streaming with two_phase enabled */
5664 options.proto.logical.twophase = true;
5666
5668
5669 /*
5670 * Updating pg_subscription might involve TOAST table access, so
5671 * ensure we have a valid snapshot.
5672 */
5674
5675 UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5676 MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5679 }
5680 else
5681 {
5683 }
5684
5686 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5688 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5689 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5690 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5691 "?")));
5692
5693 /* Run the main loop. */
5694 start_apply(origin_startpos);
5695}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5483
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5552
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6260
void err(int eval, const char *fmt,...)
Definition: err.c:43
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1120
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1273
#define NAMEDATALEN
static char ** options
bool AllTablesyncsReady(void)
Definition: tablesync.c:1770
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1821
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:443
uint16 RepOriginId
Definition: xlogdefs.h:68

References AllTablesyncsReady(), CommitTransactionCommand(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, GetTransactionSnapshot(), InvalidOid, InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::ownersuperuser, Subscription::passwordrequired, PopActiveSnapshot(), PushActiveSnapshot(), ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), set_apply_error_context_origin(), set_stream_options(), Subscription::slotname, start_apply(), StartTransactionCommand(), Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by ApplyWorkerMain().

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 4266 of file worker.c.

4267{
4268 static StringInfo reply_message = NULL;
4269 static TimestampTz send_time = 0;
4270
4271 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4272 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4273
4274 XLogRecPtr writepos;
4275 XLogRecPtr flushpos;
4277 bool have_pending_txes;
4278
4279 /*
4280 * If the user doesn't want status to be reported to the publisher, be
4281 * sure to exit before doing anything at all.
4282 */
4283 if (!force && wal_receiver_status_interval <= 0)
4284 return;
4285
4286 /* It's legal to not pass a recvpos */
4287 if (recvpos < last_recvpos)
4288 recvpos = last_recvpos;
4289
4290 get_flush_position(&writepos, &flushpos, &have_pending_txes);
4291
4292 /*
4293 * No outstanding transactions to flush, we can report the latest received
4294 * position. This is important for synchronous replication.
4295 */
4296 if (!have_pending_txes)
4297 flushpos = writepos = recvpos;
4298
4299 if (writepos < last_writepos)
4300 writepos = last_writepos;
4301
4302 if (flushpos < last_flushpos)
4303 flushpos = last_flushpos;
4304
4306
4307 /* if we've already reported everything we're good */
4308 if (!force &&
4309 writepos == last_writepos &&
4310 flushpos == last_flushpos &&
4311 !TimestampDifferenceExceeds(send_time, now,
4313 return;
4314 send_time = now;
4315
4316 if (!reply_message)
4317 {
4319
4321 MemoryContextSwitchTo(oldctx);
4322 }
4323 else
4325
4327 pq_sendint64(reply_message, recvpos); /* write */
4328 pq_sendint64(reply_message, flushpos); /* flush */
4329 pq_sendint64(reply_message, writepos); /* apply */
4330 pq_sendint64(reply_message, now); /* sendTime */
4331 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4332
4333 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4334 force,
4335 LSN_FORMAT_ARGS(recvpos),
4336 LSN_FORMAT_ARGS(writepos),
4337 LSN_FORMAT_ARGS(flushpos));
4338
4341
4342 if (recvpos > last_recvpos)
4343 last_recvpos = recvpos;
4344 if (writepos > last_writepos)
4345 last_writepos = writepos;
4346 if (flushpos > last_flushpos)
4347 last_flushpos = flushpos;
4348}
static XLogRecPtr last_flushpos
Definition: worker.c:527
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:3867
#define PqReplMsg_StandbyStatusUpdate
Definition: protocol.h:84
static StringInfoData reply_message
Definition: walreceiver.c:132

References ApplyContext, StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, last_flushpos, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 6260 of file worker.c.

6261{
6263 originname);
6264}
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_xact()

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 5483 of file worker.c.

5486{
5487 int server_version;
5488
5489 options->logical = true;
5490 options->startpoint = *origin_startpos;
5491 options->slotname = slotname;
5492
5494 options->proto.logical.proto_version =
5499
5500 options->proto.logical.publication_names = MySubscription->publications;
5501 options->proto.logical.binary = MySubscription->binary;
5502
5503 /*
5504 * Assign the appropriate option value for streaming option according to
5505 * the 'streaming' mode and the publisher's ability to support that mode.
5506 */
5507 if (server_version >= 160000 &&
5508 MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5509 {
5510 options->proto.logical.streaming_str = "parallel";
5512 }
5513 else if (server_version >= 140000 &&
5514 MySubscription->stream != LOGICALREP_STREAM_OFF)
5515 {
5516 options->proto.logical.streaming_str = "on";
5518 }
5519 else
5520 {
5521 options->proto.logical.streaming_str = NULL;
5523 }
5524
5525 options->proto.logical.twophase = false;
5526 options->proto.logical.origin = pstrdup(MySubscription->origin);
5527}
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static int server_version
Definition: pg_dumpall.c:109
#define walrcv_server_version(conn)
Definition: walreceiver.h:447

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 5831 of file worker.c.

5832{
5833 /* Attach to slot */
5834 logicalrep_worker_attach(worker_slot);
5835
5837
5838 /* Setup signal handling */
5840 pqsignal(SIGTERM, die);
5842
5843 /*
5844 * We don't currently need any ResourceOwner in a walreceiver process, but
5845 * if we did, we could call CreateAuxProcessResourceOwner here.
5846 */
5847
5848 /* Initialise stats to a sanish value */
5851
5852 /* Load the libpq-specific functions */
5853 load_file("libpqwalreceiver", false);
5854
5856
5857 /*
5858 * Register a callback to reset the origin state before aborting any
5859 * pending transaction during shutdown (see ShutdownPostgres()). This will
5860 * avoid origin advancement for an in-complete transaction which could
5861 * otherwise lead to its loss as such a transaction won't be sent by the
5862 * server again.
5863 *
5864 * Note that even a LOG or DEBUG statement placed after setting the origin
5865 * state may process a shutdown signal before committing the current apply
5866 * operation. So, it is important to register such a callback here.
5867 */
5869
5870 /* Connect to the origin and start the replication. */
5871 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5873
5874 /*
5875 * Setup callback for syscache so that we know when something changes in
5876 * the subscription relation state.
5877 */
5878 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5880 (Datum) 0);
5881}
static void replorigin_reset(int code, Datum arg)
Definition: worker.c:5822
void InitializeLogRepWorker(void)
Definition: worker.c:5705
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:927
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void logicalrep_worker_attach(int slot)
Definition: launcher.c:731
#define die(msg)
#define pqsignal
Definition: port.h:531
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
#define SIGHUP
Definition: win32_port.h:158

References am_leader_apply_worker(), am_tablesync_worker(), Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog, GetCurrentTimestamp(), InitializeLogRepWorker(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal, replorigin_reset(), LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), and TablesyncWorkerMain().

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 681 of file worker.c.

682{
683 switch (MyLogicalRepWorker->type)
684 {
686 return MyLogicalRepWorker->relid == rel->localreloid;
687
689 /* We don't synchronize rel's that are in unknown state. */
690 if (rel->state != SUBREL_STATE_READY &&
691 rel->state != SUBREL_STATE_UNKNOWN)
693 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
694 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
696 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
697
698 return rel->state == SUBREL_STATE_READY;
699
700 case WORKERTYPE_APPLY:
701 return (rel->state == SUBREL_STATE_READY ||
702 (rel->state == SUBREL_STATE_SYNCDONE &&
703 rel->statelsn <= remote_final_lsn));
704
706 /* Should never happen. */
707 elog(ERROR, "Unknown worker type");
708 }
709
710 return false; /* dummy for compiler */
711}
LogicalRepWorkerType type
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY

References elog, ereport, errcode(), errdetail(), errmsg(), ERROR, LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ should_stop_conflict_info_retention()

static bool should_stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4738 of file worker.c.

4739{
4741
4744 rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4745
4747 return false;
4748
4749 /*
4750 * Use last_recv_time when applying changes in the loop to avoid
4751 * unnecessary system time retrieval. If last_recv_time is not available,
4752 * obtain the current timestamp.
4753 */
4754 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4755
4756 /*
4757 * Return early if the wait time has not exceeded the configured maximum
4758 * (max_retention_duration). Time spent waiting for table synchronization
4759 * is excluded from this calculation, as it occurs infrequently.
4760 */
4763 rdt_data->table_sync_wait_time))
4764 return false;
4765
4766 return true;
4767}

References Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, GetCurrentTimestamp(), RetainDeadTuplesData::last_recv_time, Subscription::maxretention, MySubscription, now(), RetainDeadTuplesData::phase, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, RetainDeadTuplesData::table_sync_wait_time, TimestampDifferenceExceeds(), and TransactionIdIsValid.

Referenced by wait_for_local_flush(), and wait_for_publisher_status().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 954 of file worker.c.

956{
958 int num_phys_attrs = desc->natts;
959 int i;
960 int attnum,
961 num_defaults = 0;
962 int *defmap;
963 ExprState **defexprs;
964 ExprContext *econtext;
965
966 econtext = GetPerTupleExprContext(estate);
967
968 /* We got all the data via replication, no need to evaluate anything. */
969 if (num_phys_attrs == rel->remoterel.natts)
970 return;
971
972 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
973 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
974
975 Assert(rel->attrmap->maplen == num_phys_attrs);
976 for (attnum = 0; attnum < num_phys_attrs; attnum++)
977 {
978 Expr *defexpr;
979
980 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
981 continue;
982
983 if (rel->attrmap->attnums[attnum] >= 0)
984 continue;
985
986 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
987
988 if (defexpr != NULL)
989 {
990 /* Run the expression through planner */
991 defexpr = expression_planner(defexpr);
992
993 /* Initialize executable expression in copycontext */
994 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
995 defmap[num_defaults] = attnum;
996 num_defaults++;
997 }
998 }
999
1000 for (i = 0; i < num_defaults; i++)
1001 slot->tts_values[defmap[i]] =
1002 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1003}
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:143
#define GetPerTupleExprContext(estate)
Definition: executor.h:653
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:390
int16 attnum
Definition: pg_attribute.h:74
Expr * expression_planner(Expr *expr)
Definition: planner.c:6746
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool * tts_isnull
Definition: tuptable.h:126
Datum * tts_values
Definition: tuptable.h:124

References Assert(), attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1112 of file worker.c.

1115{
1116 int natts = slot->tts_tupleDescriptor->natts;
1117 int i;
1118
1119 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1120 ExecClearTuple(slot);
1121
1122 /*
1123 * Copy all the column data from srcslot, so that we'll have valid values
1124 * for unreplaced columns.
1125 */
1126 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1127 slot_getallattrs(srcslot);
1128 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1129 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1130
1131 /* Call the "in" function for each replaced attribute */
1132 Assert(natts == rel->attrmap->maplen);
1133 for (i = 0; i < natts; i++)
1134 {
1136 int remoteattnum = rel->attrmap->attnums[i];
1137
1138 if (remoteattnum < 0)
1139 continue;
1140
1141 Assert(remoteattnum < tupleData->ncols);
1142
1143 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1144 {
1145 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1146
1147 /* Set attnum for error callback */
1149
1150 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1151 {
1152 Oid typinput;
1153 Oid typioparam;
1154
1155 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1156 slot->tts_values[i] =
1157 OidInputFunctionCall(typinput, colvalue->data,
1158 typioparam, att->atttypmod);
1159 slot->tts_isnull[i] = false;
1160 }
1161 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1162 {
1163 Oid typreceive;
1164 Oid typioparam;
1165
1166 /*
1167 * In some code paths we may be asked to re-parse the same
1168 * tuple data. Reset the StringInfo's cursor so that works.
1169 */
1170 colvalue->cursor = 0;
1171
1172 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1173 slot->tts_values[i] =
1174 OidReceiveFunctionCall(typreceive, colvalue,
1175 typioparam, att->atttypmod);
1176
1177 /* Trouble if it didn't eat the whole buffer */
1178 if (colvalue->cursor != colvalue->len)
1179 ereport(ERROR,
1180 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1181 errmsg("incorrect binary data format in logical replication column %d",
1182 remoteattnum + 1)));
1183 slot->tts_isnull[i] = false;
1184 }
1185 else
1186 {
1187 /* must be LOGICALREP_COLUMN_NULL */
1188 slot->tts_values[i] = (Datum) 0;
1189 slot->tts_isnull[i] = true;
1190 }
1191
1192 /* Reset attnum for error callback */
1194 }
1195 }
1196
1197 /* And finally, declare that "slot" contains a valid virtual tuple */
1199}
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1772
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1754
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:99
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:98
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:3041
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:3107
StringInfoData * colvalues
Definition: logicalproto.h:87
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1011 of file worker.c.

1013{
1014 int natts = slot->tts_tupleDescriptor->natts;
1015 int i;
1016
1017 ExecClearTuple(slot);
1018
1019 /* Call the "in" function for each non-dropped, non-null attribute */
1020 Assert(natts == rel->attrmap->maplen);
1021 for (i = 0; i < natts; i++)
1022 {
1024 int remoteattnum = rel->attrmap->attnums[i];
1025
1026 if (!att->attisdropped && remoteattnum >= 0)
1027 {
1028 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1029
1030 Assert(remoteattnum < tupleData->ncols);
1031
1032 /* Set attnum for error callback */
1034
1035 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1036 {
1037 Oid typinput;
1038 Oid typioparam;
1039
1040 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1041 slot->tts_values[i] =
1042 OidInputFunctionCall(typinput, colvalue->data,
1043 typioparam, att->atttypmod);
1044 slot->tts_isnull[i] = false;
1045 }
1046 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1047 {
1048 Oid typreceive;
1049 Oid typioparam;
1050
1051 /*
1052 * In some code paths we may be asked to re-parse the same
1053 * tuple data. Reset the StringInfo's cursor so that works.
1054 */
1055 colvalue->cursor = 0;
1056
1057 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1058 slot->tts_values[i] =
1059 OidReceiveFunctionCall(typreceive, colvalue,
1060 typioparam, att->atttypmod);
1061
1062 /* Trouble if it didn't eat the whole buffer */
1063 if (colvalue->cursor != colvalue->len)
1064 ereport(ERROR,
1065 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1066 errmsg("incorrect binary data format in logical replication column %d",
1067 remoteattnum + 1)));
1068 slot->tts_isnull[i] = false;
1069 }
1070 else
1071 {
1072 /*
1073 * NULL value from remote. (We don't expect to see
1074 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1075 * NULL.)
1076 */
1077 slot->tts_values[i] = (Datum) 0;
1078 slot->tts_isnull[i] = true;
1079 }
1080
1081 /* Reset attnum for error callback */
1083 }
1084 else
1085 {
1086 /*
1087 * We assign NULL to dropped attributes and missing values
1088 * (missing values should be later filled using
1089 * slot_fill_defaults).
1090 */
1091 slot->tts_values[i] = (Datum) 0;
1092 slot->tts_isnull[i] = true;
1093 }
1094 }
1095
1097}

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_tuple_routing(), apply_handle_update(), and apply_handle_update_internal().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 5552 of file worker.c.

5553{
5554 PG_TRY();
5555 {
5556 LogicalRepApplyLoop(origin_startpos);
5557 }
5558 PG_CATCH();
5559 {
5560 /*
5561 * Reset the origin state to prevent the advancement of origin
5562 * progress if we fail to apply. Otherwise, this will result in
5563 * transaction loss as that transaction won't be sent again by the
5564 * server.
5565 */
5566 replorigin_reset(0, (Datum) 0);
5567
5570 else
5571 {
5572 /*
5573 * Report the worker failed while applying changes. Abort the
5574 * current transaction so that the stats message is sent in an
5575 * idle state.
5576 */
5579
5580 PG_RE_THROW();
5581 }
5582 }
5583 PG_END_TRY();
5584}
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3953
void DisableSubscriptionAndExit(void)
Definition: worker.c:5905
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), and replorigin_reset().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ stop_conflict_info_retention()

static void stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4773 of file worker.c.

4774{
4775 /* Stop retention if not yet */
4777 {
4778 /*
4779 * If the retention status cannot be updated (e.g., due to active
4780 * transaction), skip further processing to avoid inconsistent
4781 * retention behavior.
4782 */
4783 if (!update_retention_status(false))
4784 return;
4785
4789
4790 ereport(LOG,
4791 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4793 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4794 }
4795
4797
4798 /*
4799 * If retention has been stopped, reset to the initial phase to retry
4800 * resuming retention. This reset is required to recalculate the current
4801 * wait time and resume retention if the time falls within
4802 * max_retention_duration.
4803 */
4805}
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4890

References Assert(), ereport, errdetail(), errmsg(), InvalidTransactionId, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::relmutex, reset_retention_data_fields(), Subscription::retentionactive, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 6007 of file worker.c.

6008{
6009 if (!is_skipping_changes())
6010 return;
6011
6012 ereport(LOG,
6013 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6015
6016 /* Stop skipping changes */
6018}

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3911 of file worker.c.

3912{
3913 FlushPosition *flushpos;
3914
3915 /*
3916 * Skip for parallel apply workers, because the lsn_mapping is maintained
3917 * by the leader apply worker.
3918 */
3920 return;
3921
3922 /* Need to do this in permanent context */
3924
3925 /* Track commit lsn */
3926 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3927 flushpos->local_end = local_lsn;
3928 flushpos->remote_end = remote_lsn;
3929
3930 dlist_push_tail(&lsn_mapping, &flushpos->node);
3932}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:303

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_abort_internal()

static void stream_abort_internal ( TransactionId  xid,
TransactionId  subxid 
)
static

Definition at line 1966 of file worker.c.

1967{
1968 /*
1969 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1970 * just delete the files with serialized info.
1971 */
1972 if (xid == subxid)
1974 else
1975 {
1976 /*
1977 * OK, so it's a subxact. We need to read the subxact file for the
1978 * toplevel transaction, determine the offset tracked for the subxact,
1979 * and truncate the file with changes. We also remove the subxacts
1980 * with higher offsets (or rather higher XIDs).
1981 *
1982 * We intentionally scan the array from the tail, because we're likely
1983 * aborting a change for the most recent subtransactions.
1984 *
1985 * We can't use the binary search here as subxact XIDs won't
1986 * necessarily arrive in sorted order, consider the case where we have
1987 * released the savepoint for multiple subtransactions and then
1988 * performed rollback to savepoint for one of the earlier
1989 * sub-transaction.
1990 */
1991 int64 i;
1992 int64 subidx;
1993 BufFile *fd;
1994 bool found = false;
1995 char path[MAXPGPATH];
1996
1997 subidx = -1;
2000
2001 for (i = subxact_data.nsubxacts; i > 0; i--)
2002 {
2003 if (subxact_data.subxacts[i - 1].xid == subxid)
2004 {
2005 subidx = (i - 1);
2006 found = true;
2007 break;
2008 }
2009 }
2010
2011 /*
2012 * If it's an empty sub-transaction then we will not find the subxid
2013 * here so just cleanup the subxact info and return.
2014 */
2015 if (!found)
2016 {
2017 /* Cleanup the subxact info */
2021 return;
2022 }
2023
2024 /* open the changes file */
2027 O_RDWR, false);
2028
2029 /* OK, truncate the file at the right offset */
2031 subxact_data.subxacts[subidx].offset);
2033
2034 /* discard the subxacts added later */
2035 subxact_data.nsubxacts = subidx;
2036
2037 /* write the updated subxact list */
2039
2042 }
2043}
static void cleanup_subxact_info(void)
Definition: worker.c:5533
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:5151
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:5200
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:928
int64_t int64
Definition: c.h:535
off_t offset
Definition: worker.c:533
TransactionId xid
Definition: worker.c:531
int fileno
Definition: worker.c:532

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), fd(), SubXactInfo::fileno, i, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_handle_stream_abort().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 5350 of file worker.c.

5351{
5352 char path[MAXPGPATH];
5353
5354 /* Delete the changes file. */
5355 changes_filename(path, subid, xid);
5357
5358 /* Delete the subxact file, if it exists. */
5359 subxact_filename(path, subid, xid);
5361}
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:5329
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_close_file()

static void stream_close_file ( void  )
static

◆ stream_open_and_write_change()

static void stream_open_and_write_change ( TransactionId  xid,
char  action,
StringInfo  s 
)
static

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 5374 of file worker.c.

5375{
5376 char path[MAXPGPATH];
5377 MemoryContext oldcxt;
5378
5379 Assert(OidIsValid(subid));
5381 Assert(stream_fd == NULL);
5382
5383
5384 changes_filename(path, subid, xid);
5385 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5386
5387 /*
5388 * Create/open the buffiles under the logical streaming context so that we
5389 * have those files until stream stop.
5390 */
5392
5393 /*
5394 * If this is the first streamed segment, create the changes file.
5395 * Otherwise, just open the file for writing, in append mode.
5396 */
5397 if (first_segment)
5399 path);
5400 else
5401 {
5402 /*
5403 * Open the file and seek to the end of the file because we always
5404 * append the changes file.
5405 */
5407 path, O_RDWR, false);
5408 BufFileSeek(stream_fd, 0, 0, SEEK_END);
5409 }
5410
5411 MemoryContextSwitchTo(oldcxt);
5412}
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:267

References Assert(), BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by stream_start_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1666 of file worker.c.

1667{
1669
1670 /*
1671 * Initialize the worker's stream_fileset if we haven't yet. This will be
1672 * used for the entire duration of the worker so create it in a permanent
1673 * context. We create this on the very first streaming message from any
1674 * transaction and then use it for this and other streaming transactions.
1675 * Now, we could create a fileset at the start of the worker as well but
1676 * then we won't be sure that it will ever be used.
1677 */
1679 {
1680 MemoryContext oldctx;
1681
1683
1686
1687 MemoryContextSwitchTo(oldctx);
1688 }
1689
1690 /* Open the spool file for this transaction. */
1691 stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1692
1693 /* If this is not the first segment, open existing subxact file. */
1694 if (!first_segment)
1696
1698}
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:5374
void FileSetInit(FileSet *fileset)
Definition: fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1840 of file worker.c.

1841{
1842 /*
1843 * Serialize information about subxacts for the toplevel transaction, then
1844 * close the stream messages spool file.
1845 */
1848
1849 /* We must be in a valid transaction state */
1851
1852 /* Commit the per-stream transaction */
1854
1855 /* Reset per-stream context */
1857}

References Assert(), CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 5437 of file worker.c.

5438{
5439 int len;
5440
5441 Assert(stream_fd != NULL);
5442
5443 /* total on-disk size, including the action type character */
5444 len = (s->len - s->cursor) + sizeof(char);
5445
5446 /* first write the size */
5447 BufFileWrite(stream_fd, &len, sizeof(len));
5448
5449 /* then the action */
5450 BufFileWrite(stream_fd, &action, sizeof(action));
5451
5452 /* and finally the remaining part of the buffer (after the XID) */
5453 len = (s->len - s->cursor);
5454
5456}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition: buffile.c:676

References generate_unaccent_rules::action, Assert(), BufFileWrite(), StringInfoData::cursor, StringInfoData::data, StringInfoData::len, len, and stream_fd.

Referenced by apply_handle_stream_start(), apply_handle_stream_stop(), handle_streamed_transaction(), and stream_open_and_write_change().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 5136 of file worker.c.

5137{
5138 MySubscriptionValid = false;
5139}

References MySubscriptionValid.

Referenced by InitializeLogRepWorker().

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5329 of file worker.c.

5330{
5331 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5332}

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 5251 of file worker.c.

5252{
5253 SubXactInfo *subxacts = subxact_data.subxacts;
5254 int64 i;
5255
5256 /* We must have a valid top level stream xid and a stream fd. */
5258 Assert(stream_fd != NULL);
5259
5260 /*
5261 * If the XID matches the toplevel transaction, we don't want to add it.
5262 */
5263 if (stream_xid == xid)
5264 return;
5265
5266 /*
5267 * In most cases we're checking the same subxact as we've already seen in
5268 * the last call, so make sure to ignore it (this change comes later).
5269 */
5270 if (subxact_data.subxact_last == xid)
5271 return;
5272
5273 /* OK, remember we're processing this XID. */
5275
5276 /*
5277 * Check if the transaction is already present in the array of subxact. We
5278 * intentionally scan the array from the tail, because we're likely adding
5279 * a change for the most recent subtransactions.
5280 *
5281 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5282 * would allow us to use binary search here.
5283 */
5284 for (i = subxact_data.nsubxacts; i > 0; i--)
5285 {
5286 /* found, so we're done */
5287 if (subxacts[i - 1].xid == xid)
5288 return;
5289 }
5290
5291 /* This is a new subxact, so we need to add it to the array. */
5292 if (subxact_data.nsubxacts == 0)
5293 {
5294 MemoryContext oldctx;
5295
5297
5298 /*
5299 * Allocate this memory for subxacts in per-stream context, see
5300 * subxact_info_read.
5301 */
5303 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5304 MemoryContextSwitchTo(oldctx);
5305 }
5307 {
5309 subxacts = repalloc(subxacts,
5311 }
5312
5313 subxacts[subxact_data.nsubxacts].xid = xid;
5314
5315 /*
5316 * Get the current offset of the stream file and store it as offset of
5317 * this subxact.
5318 */
5320 &subxacts[subxact_data.nsubxacts].fileno,
5321 &subxacts[subxact_data.nsubxacts].offset);
5322
5324 subxact_data.subxacts = subxacts;
5325}

References Assert(), BufFileTell(), SubXactInfo::fileno, i, LogicalStreamingContext, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc(), repalloc(), stream_fd, stream_xid, subxact_data, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5200 of file worker.c.

5201{
5202 char path[MAXPGPATH];
5203 Size len;
5204 BufFile *fd;
5205 MemoryContext oldctx;
5206
5210
5211 /*
5212 * If the subxact file doesn't exist that means we don't have any subxact
5213 * info.
5214 */
5215 subxact_filename(path, subid, xid);
5217 true);
5218 if (fd == NULL)
5219 return;
5220
5221 /* read number of subxact items */
5223
5225
5226 /* we keep the maximum as a power of 2 */
5228
5229 /*
5230 * Allocate subxact information in the logical streaming context. We need
5231 * this information during the complete stream so that we can add the sub
5232 * transaction info to this. On stream stop we will flush this information
5233 * to the subxact file and reset the logical streaming context.
5234 */
5237 sizeof(SubXactInfo));
5238 MemoryContextSwitchTo(oldctx);
5239
5240 if (len > 0)
5242
5244}
struct SubXactInfo SubXactInfo
size_t Size
Definition: c.h:610
static uint32 pg_ceil_log2_32(uint32 num)
Definition: pg_bitutils.h:258

References Assert(), BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), fd(), len, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc(), pg_ceil_log2_32(), LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and stream_start_internal().

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5151 of file worker.c.

5152{
5153 char path[MAXPGPATH];
5154 Size len;
5155 BufFile *fd;
5156
5158
5159 /* construct the subxact filename */
5160 subxact_filename(path, subid, xid);
5161
5162 /* Delete the subxacts file, if exists. */
5163 if (subxact_data.nsubxacts == 0)
5164 {
5167
5168 return;
5169 }
5170
5171 /*
5172 * Create the subxact file if it not already created, otherwise open the
5173 * existing file.
5174 */
5176 true);
5177 if (fd == NULL)
5179
5181
5182 /* Write the subxact count and subxact info */
5185
5187
5188 /* free the memory allocated for subxact info */
5190}

References Assert(), BufFileClose(), BufFileCreateFileSet(), BufFileDeleteFileSet(), BufFileOpenFileSet(), BufFileWrite(), cleanup_subxact_info(), fd(), len, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by stream_abort_internal(), and stream_stop_internal().

◆ TargetPrivilegesCheck()

static void TargetPrivilegesCheck ( Relation  rel,
AclMode  mode 
)
static

Definition at line 2576 of file worker.c.

2577{
2578 Oid relid;
2579 AclResult aclresult;
2580
2581 relid = RelationGetRelid(rel);
2582 aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2583 if (aclresult != ACLCHECK_OK)
2584 aclcheck_error(aclresult,
2585 get_relkind_objtype(rel->rd_rel->relkind),
2586 get_rel_name(relid));
2587
2588 /*
2589 * We lack the infrastructure to honor RLS policies. It might be possible
2590 * to add such infrastructure here, but tablesync workers lack it, too, so
2591 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2592 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2593 * replicate subsequent INSERTs, so we forbid all commands the same.
2594 */
2595 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2596 ereport(ERROR,
2597 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2598 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2601}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
Oid GetUserId(void)
Definition: miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:988
ObjectType get_relkind_objtype(char relkind)
static PgChecksumMode mode
Definition: pg_checksums.c:55
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45

References aclcheck_error(), ACLCHECK_OK, check_enable_rls(), ereport, errcode(), errmsg(), ERROR, get_rel_name(), get_relkind_objtype(), GetUserId(), GetUserNameFromId(), InvalidOid, mode, pg_class_aclcheck(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, and RLS_ENABLED.

Referenced by apply_handle_delete_internal(), apply_handle_insert_internal(), apply_handle_truncate(), apply_handle_tuple_routing(), apply_handle_update_internal(), and FindReplTupleInLocalRel().

◆ update_retention_status()

static bool update_retention_status ( bool  active)
static

Definition at line 4851 of file worker.c.

4852{
4853 /*
4854 * Do not update the catalog during an active transaction. The transaction
4855 * may be started during change application, leading to a possible
4856 * rollback of catalog updates if the application fails subsequently.
4857 */
4858 if (IsTransactionState())
4859 return false;
4860
4862
4863 /*
4864 * Updating pg_subscription might involve TOAST table access, so ensure we
4865 * have a valid snapshot.
4866 */
4868
4869 /* Update pg_subscription.subretentionactive */
4871
4874
4875 /* Notify launcher to update the conflict slot */
4877
4879
4880 return true;
4881}
void ApplyLauncherWakeup(void)
Definition: launcher.c:1141
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)

References ApplyLauncherWakeup(), CommitTransactionCommand(), GetTransactionSnapshot(), IsTransactionState(), MySubscription, Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), Subscription::retentionactive, StartTransactionCommand(), and UpdateDeadTupleRetentionStatus().

Referenced by resume_conflict_info_retention(), and stop_conflict_info_retention().

◆ UpdateWorkerStats()

static void UpdateWorkerStats ( XLogRecPtr  last_lsn,
TimestampTz  send_time,
bool  reply 
)
static

◆ wait_for_local_flush()

static void wait_for_local_flush ( RetainDeadTuplesData rdt_data)
static

Definition at line 4583 of file worker.c.

4584{
4587
4588 /*
4589 * We expect the publisher and subscriber clocks to be in sync using time
4590 * sync service like NTP. Otherwise, we will advance this worker's
4591 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4592 * required to detect update_deleted reliably. This check primarily
4593 * addresses scenarios where the publisher's clock falls behind; if the
4594 * publisher's clock is ahead, subsequent transactions will naturally bear
4595 * later commit timestamps, conforming to the design outlined atop
4596 * worker.c.
4597 *
4598 * XXX Consider waiting for the publisher's clock to catch up with the
4599 * subscriber's before proceeding to the next phase.
4600 */
4602 rdt_data->candidate_xid_time, 0))
4603 ereport(ERROR,
4604 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4605 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4606
4607 /*
4608 * Do not attempt to advance the non-removable transaction ID when table
4609 * sync is in progress. During this time, changes from a single
4610 * transaction may be applied by multiple table sync workers corresponding
4611 * to the target tables. So, it's necessary for all table sync workers to
4612 * apply and flush the corresponding changes before advancing the
4613 * transaction ID, otherwise, dead tuples that are still needed for
4614 * conflict detection in table sync workers could be removed prematurely.
4615 * However, confirming the apply and flush progress across all table sync
4616 * workers is complex and not worth the effort, so we simply return if not
4617 * all tables are in the READY state.
4618 *
4619 * Advancing the transaction ID is necessary even when no tables are
4620 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4621 * While it might seem safe to skip all phases and directly assign
4622 * candidate_xid to oldest_nonremovable_xid during the
4623 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4624 * concurrently add tables to the subscription, the apply worker may not
4625 * process invalidations in time. Consequently,
4626 * HasSubscriptionRelationsCached() might miss the new tables, leading to
4627 * premature advancement of oldest_nonremovable_xid.
4628 *
4629 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4630 * invalidations are guaranteed to be processed before applying changes
4631 * from newly added tables while waiting for the local flush to reach
4632 * remote_lsn.
4633 *
4634 * Additionally, even if we check for subscription tables during
4635 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4636 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4637 * subscription tables at this stage to prevent unnecessary tuple
4638 * retention.
4639 */
4641 {
4643
4644 now = rdt_data->last_recv_time
4645 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4646
4647 /*
4648 * Record the time spent waiting for table sync, it is needed for the
4649 * timeout check in should_stop_conflict_info_retention().
4650 */
4651 rdt_data->table_sync_wait_time =
4653
4654 return;
4655 }
4656
4657 /*
4658 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4659 * retaining conflict information for this worker.
4660 */
4662 {
4664 return;
4665 }
4666
4667 /*
4668 * Update and check the remote flush position if we are applying changes
4669 * in a loop. This is done at most once per WalWriterDelay to avoid
4670 * performing costly operations in get_flush_position() too frequently
4671 * during change application.
4672 */
4673 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4675 rdt_data->last_recv_time, WalWriterDelay))
4676 {
4677 XLogRecPtr writepos;
4678 XLogRecPtr flushpos;
4679 bool have_pending_txes;
4680
4681 /* Fetch the latest remote flush position */
4682 get_flush_position(&writepos, &flushpos, &have_pending_txes);
4683
4684 if (flushpos > last_flushpos)
4685 last_flushpos = flushpos;
4686
4687 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4688 }
4689
4690 /* Return to wait for the changes to be applied */
4691 if (last_flushpos < rdt_data->remote_lsn)
4692 return;
4693
4694 /*
4695 * Reaching this point implies should_stop_conflict_info_retention()
4696 * returned false earlier, meaning that the most recent duration for
4697 * advancing the non-removable transaction ID is within the
4698 * max_retention_duration or max_retention_duration is set to 0.
4699 *
4700 * Therefore, if conflict info retention was previously stopped due to a
4701 * timeout, it is now safe to resume retention.
4702 */
4704 {
4706 return;
4707 }
4708
4709 /*
4710 * Reaching here means the remote WAL position has been received, and all
4711 * transactions up to that position on the publisher have been applied and
4712 * flushed locally. So, we can advance the non-removable transaction ID.
4713 */
4717
4718 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4719 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4720 rdt_data->candidate_xid);
4721
4722 /* Notify launcher to update the xmin of the conflict slot */
4724
4726
4727 /* process the next phase */
4728 process_rdt_phase_transition(rdt_data, false);
4729}
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition: worker.c:4738
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1234
TimestampTz flushpos_update_time
Definition: worker.c:432
bool HasSubscriptionRelationsCached(void)
Definition: tablesync.c:1800

References AllTablesyncsReady(), ApplyLauncherWakeup(), Assert(), RetainDeadTuplesData::candidate_xid, RetainDeadTuplesData::candidate_xid_time, DEBUG2, elog, ereport, errdetail_internal(), errmsg_internal(), ERROR, RetainDeadTuplesData::flushpos_update_time, get_flush_position(), GetCurrentTimestamp(), HasSubscriptionRelationsCached(), last_flushpos, RetainDeadTuplesData::last_recv_time, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, now(), LogicalRepWorker::oldest_nonremovable_xid, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, LogicalRepWorker::relmutex, RetainDeadTuplesData::remote_lsn, RetainDeadTuplesData::reply_time, reset_retention_data_fields(), Subscription::retentionactive, should_stop_conflict_info_retention(), SpinLockAcquire, SpinLockRelease, RetainDeadTuplesData::table_sync_wait_time, TimestampDifferenceExceeds(), TimestampDifferenceMilliseconds(), TransactionIdIsValid, WalWriterDelay, and XLogRecPtrIsInvalid.

Referenced by process_rdt_phase_transition().

◆ wait_for_publisher_status()

static void wait_for_publisher_status ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4524 of file worker.c.

4526{
4527 /*
4528 * Return if we have requested but not yet received the publisher status.
4529 */
4530 if (!status_received)
4531 return;
4532
4533 /*
4534 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4535 * retaining conflict information for this worker.
4536 */
4538 {
4540 return;
4541 }
4542
4544 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4545
4546 /*
4547 * Check if all remote concurrent transactions that were active at the
4548 * first status request have now completed. If completed, proceed to the
4549 * next phase; otherwise, continue checking the publisher status until
4550 * these transactions finish.
4551 *
4552 * It's possible that transactions in the commit phase during the last
4553 * cycle have now finished committing, but remote_oldestxid remains older
4554 * than remote_wait_for. This can happen if some old transaction came in
4555 * the commit phase when we requested status in this cycle. We do not
4556 * handle this case explicitly as it's rare and the benefit doesn't
4557 * justify the required complexity. Tracking would require either caching
4558 * all xids at the publisher or sending them to subscribers. The condition
4559 * will resolve naturally once the remaining transactions are finished.
4560 *
4561 * Directly advancing the non-removable transaction ID is possible if
4562 * there are no activities on the publisher since the last advancement
4563 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4564 * and last_remote_lsn, within the structure for comparison with the
4565 * current cycle's values. Considering the minimal cost of continuing in
4566 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4567 * advance the transaction ID here.
4568 */
4570 rdt_data->remote_oldestxid))
4571 rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4572 else
4574
4575 /* process the next phase */
4576 process_rdt_phase_transition(rdt_data, false);
4577}
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition: transam.h:52
#define FullTransactionIdIsValid(x)
Definition: transam.h:55

References FullTransactionIdIsValid, FullTransactionIdPrecedesOrEquals, RetainDeadTuplesData::phase, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, RetainDeadTuplesData::remote_nextxid, RetainDeadTuplesData::remote_oldestxid, RetainDeadTuplesData::remote_wait_for, and should_stop_conflict_info_retention().

Referenced by process_rdt_phase_transition().

Variable Documentation

◆ apply_error_callback_arg

ApplyErrorCallbackArg apply_error_callback_arg
static
Initial value:
=
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}

Definition at line 459 of file worker.c.

Referenced by apply_dispatch(), apply_error_callback(), apply_handle_delete(), apply_handle_insert(), apply_handle_update(), reset_apply_error_context_info(), set_apply_error_context_origin(), set_apply_error_context_xact(), slot_modify_data(), and slot_store_data().

◆ apply_error_context_stack

ErrorContextCallback* apply_error_context_stack = NULL

Definition at line 469 of file worker.c.

Referenced by LogicalRepApplyLoop(), and ProcessParallelApplyMessage().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ in_streamed_transaction

◆ InitializingApplyWorker

bool InitializingApplyWorker = false

Definition at line 499 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ last_flushpos

XLogRecPtr last_flushpos = InvalidXLogRecPtr
static

Definition at line 527 of file worker.c.

Referenced by send_feedback(), and wait_for_local_flush().

◆ LogicalStreamingContext

MemoryContext LogicalStreamingContext = NULL
static

◆ LogRepWorkerWalRcvConn

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 308 of file worker.c.

Referenced by get_flush_position(), LogicalRepApplyLoop(), and store_flush_position().

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false
static

◆ on_commit_wakeup_workers_subids

List* on_commit_wakeup_workers_subids = NIL
static

Definition at line 482 of file worker.c.

Referenced by AtEOXact_LogicalRepWorkers(), and LogicalRepWorkersWakeupAtCommit().

◆ parallel_stream_nchanges

uint32 parallel_stream_nchanges = 0
static

◆ remote_final_lsn

◆ skip_xact_finish_lsn

XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
static

Definition at line 516 of file worker.c.

Referenced by maybe_start_skipping_changes(), and stop_skipping_changes().

◆ stream_fd

◆ stream_xid

◆ subxact_data