PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/genam.h"
#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/tupconvert.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/walwriter.h"
#include "replication/conflict.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
#include "utils/wait_event.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  RetainDeadTuplesData
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define MIN_XID_ADVANCE_INTERVAL   100
 
#define MAX_XID_ADVANCE_INTERVAL   180000
 
#define is_skipping_changes()   (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct RetainDeadTuplesData RetainDeadTuplesData
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 
enum  RetainDeadTuplesPhase {
  RDT_GET_CANDIDATE_XID , RDT_REQUEST_PUBLISHER_STATUS , RDT_WAIT_FOR_PUBLISHER_STATUS , RDT_WAIT_FOR_LOCAL_FLUSH ,
  RDT_STOP_CONFLICT_INFO_RETENTION , RDT_RESUME_CONFLICT_INFO_RETENTION
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void maybe_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data, bool status_received)
 
static bool can_advance_nonremovable_xid (RetainDeadTuplesData *rdt_data)
 
static void process_rdt_phase_transition (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void get_candidate_xid (RetainDeadTuplesData *rdt_data)
 
static void request_publisher_status (RetainDeadTuplesData *rdt_data)
 
static void wait_for_publisher_status (RetainDeadTuplesData *rdt_data, bool status_received)
 
static void wait_for_local_flush (RetainDeadTuplesData *rdt_data)
 
static bool should_stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void stop_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static void resume_conflict_info_retention (RetainDeadTuplesData *rdt_data)
 
static bool update_retention_status (bool active)
 
static void reset_retention_data_fields (RetainDeadTuplesData *rdt_data)
 
static void adjust_xid_advance_interval (RetainDeadTuplesData *rdt_data, bool new_xid_found)
 
static void apply_worker_exit (void)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static bool FindDeletedTupleInLocalRel (Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
static void set_wal_receiver_timeout (void)
 
static void on_exit_clear_xact_state (int code, Datum arg)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static bool IsIndexUsableForFindingDeletedTuple (Oid localindexoid, TransactionId conflict_detection_xmin)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
static void run_apply_worker (void)
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void ApplyWorkerMain (Datum main_arg)
 
void DisableSubscriptionAndExit (void)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
bool InitializingApplyWorker = false
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static XLogRecPtr last_flushpos = InvalidXLogRecPtr
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))

Definition at line 522 of file worker.c.

◆ MAX_XID_ADVANCE_INTERVAL

#define MAX_XID_ADVANCE_INTERVAL   180000

Definition at line 461 of file worker.c.

◆ MIN_XID_ADVANCE_INTERVAL

#define MIN_XID_ADVANCE_INTERVAL   100

Definition at line 460 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 304 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

◆ RetainDeadTuplesData

◆ SubXactInfo

Enumeration Type Documentation

◆ RetainDeadTuplesPhase

Enumerator
RDT_GET_CANDIDATE_XID 
RDT_REQUEST_PUBLISHER_STATUS 
RDT_WAIT_FOR_PUBLISHER_STATUS 
RDT_WAIT_FOR_LOCAL_FLUSH 
RDT_STOP_CONFLICT_INFO_RETENTION 
RDT_RESUME_CONFLICT_INFO_RETENTION 

Definition at line 392 of file worker.c.

393{
RetainDeadTuplesPhase
Definition worker.c:393
@ RDT_WAIT_FOR_PUBLISHER_STATUS
Definition worker.c:396
@ RDT_RESUME_CONFLICT_INFO_RETENTION
Definition worker.c:399
@ RDT_GET_CANDIDATE_XID
Definition worker.c:394
@ RDT_REQUEST_PUBLISHER_STATUS
Definition worker.c:395
@ RDT_WAIT_FOR_LOCAL_FLUSH
Definition worker.c:397
@ RDT_STOP_CONFLICT_INFO_RETENTION
Definition worker.c:398

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 374 of file worker.c.

375{
376 /* The action for non-streaming transactions. */
378
379 /* Actions for streaming transactions. */
TransApplyAction
Definition worker.c:375
@ TRANS_LEADER_SERIALIZE
Definition worker.c:380
@ TRANS_PARALLEL_APPLY
Definition worker.c:383
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition worker.c:381
@ TRANS_LEADER_APPLY
Definition worker.c:377
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition worker.c:382

Function Documentation

◆ adjust_xid_advance_interval()

static void adjust_xid_advance_interval ( RetainDeadTuplesData rdt_data,
bool  new_xid_found 
)
static

Definition at line 4962 of file worker.c.

4963{
4964 if (rdt_data->xid_advance_interval && !new_xid_found)
4965 {
4969
4970 /*
4971 * No new transaction ID has been assigned since the last check, so
4972 * double the interval, but not beyond the maximum allowable value.
4973 */
4974 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4975 max_interval);
4976 }
4977 else if (rdt_data->xid_advance_interval &&
4979 {
4980 /*
4981 * Retention has been stopped, so double the interval-capped at a
4982 * maximum of 3 minutes. The wal_receiver_status_interval is
4983 * intentionally not used as a upper bound, since the likelihood of
4984 * retention resuming is lower than that of general activity resuming.
4985 */
4986 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4988 }
4989 else
4990 {
4991 /*
4992 * A new transaction ID was found or the interval is not yet
4993 * initialized, so set the interval to the minimum value.
4994 */
4995 rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4996 }
4997
4998 /*
4999 * Ensure the wait time remains within the maximum retention time limit
5000 * when retention is active.
5001 */
5003 rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5005}
#define MAX_XID_ADVANCE_INTERVAL
Definition worker.c:461
#define MIN_XID_ADVANCE_INTERVAL
Definition worker.c:460
Subscription * MySubscription
Definition worker.c:484
#define Min(x, y)
Definition c.h:1091
static int fb(int x)
int wal_receiver_status_interval
Definition walreceiver.c:90

References fb(), MAX_XID_ADVANCE_INTERVAL, Subscription::maxretention, Min, MIN_XID_ADVANCE_INTERVAL, MySubscription, Subscription::retentionactive, and wal_receiver_status_interval.

Referenced by get_candidate_xid().

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3782 of file worker.c.

3783{
3786
3787 /*
3788 * Set the current command being applied. Since this function can be
3789 * called recursively when applying spooled changes, save the current
3790 * command.
3791 */
3794
3795 switch (action)
3796 {
3799 break;
3800
3803 break;
3804
3807 break;
3808
3811 break;
3812
3815 break;
3816
3819 break;
3820
3823 break;
3824
3827 break;
3828
3831 break;
3832
3834
3835 /*
3836 * Logical replication does not use generic logical messages yet.
3837 * Although, it could be used by other applications that use this
3838 * output plugin.
3839 */
3840 break;
3841
3844 break;
3845
3848 break;
3849
3852 break;
3853
3856 break;
3857
3860 break;
3861
3864 break;
3865
3868 break;
3869
3872 break;
3873
3876 break;
3877
3878 default:
3879 ereport(ERROR,
3881 errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3882 }
3883
3884 /* Reset the current command */
3886}
static void apply_handle_stream_prepare(StringInfo s)
Definition worker.c:1525
static void apply_handle_type(StringInfo s)
Definition worker.c:2593
static void apply_handle_truncate(StringInfo s)
Definition worker.c:3654
static void apply_handle_update(StringInfo s)
Definition worker.c:2797
static void apply_handle_stream_commit(StringInfo s)
Definition worker.c:2397
static void apply_handle_commit_prepared(StringInfo s)
Definition worker.c:1412
static ApplyErrorCallbackArg apply_error_callback_arg
Definition worker.c:464
static void apply_handle_delete(StringInfo s)
Definition worker.c:3019
static void apply_handle_begin(StringInfo s)
Definition worker.c:1218
static void apply_handle_commit(StringInfo s)
Definition worker.c:1243
static void apply_handle_stream_abort(StringInfo s)
Definition worker.c:2078
static void apply_handle_relation(StringInfo s)
Definition worker.c:2570
static void apply_handle_prepare(StringInfo s)
Definition worker.c:1338
static void apply_handle_rollback_prepared(StringInfo s)
Definition worker.c:1464
static void apply_handle_stream_stop(StringInfo s)
Definition worker.c:1892
static void apply_handle_origin(StringInfo s)
Definition worker.c:1673
static void apply_handle_begin_prepare(StringInfo s)
Definition worker.c:1272
static void apply_handle_stream_start(StringInfo s)
Definition worker.c:1732
static void apply_handle_insert(StringInfo s)
Definition worker.c:2640
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:40
#define ereport(elevel,...)
Definition elog.h:152
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
LogicalRepMsgType
@ LOGICAL_REP_MSG_INSERT
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_BEGIN
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_COMMIT
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_TYPE
@ LOGICAL_REP_MSG_DELETE
@ LOGICAL_REP_MSG_STREAM_COMMIT
@ LOGICAL_REP_MSG_ORIGIN
@ LOGICAL_REP_MSG_UPDATE
static char * errmsg
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
LogicalRepMsgType command
Definition worker.c:330

References apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void arg)

Definition at line 6222 of file worker.c.

6223{
6225
6227 return;
6228
6229 Assert(errarg->origin_name);
6230
6231 if (errarg->rel == NULL)
6232 {
6233 if (!TransactionIdIsValid(errarg->remote_xid))
6234 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6235 errarg->origin_name,
6237 else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6238 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6239 errarg->origin_name,
6241 errarg->remote_xid);
6242 else
6243 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6244 errarg->origin_name,
6246 errarg->remote_xid,
6247 LSN_FORMAT_ARGS(errarg->finish_lsn));
6248 }
6249 else
6250 {
6251 if (errarg->remote_attnum < 0)
6252 {
6253 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6254 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6255 errarg->origin_name,
6257 errarg->rel->remoterel.nspname,
6258 errarg->rel->remoterel.relname,
6259 errarg->remote_xid);
6260 else
6261 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6262 errarg->origin_name,
6264 errarg->rel->remoterel.nspname,
6265 errarg->rel->remoterel.relname,
6266 errarg->remote_xid,
6267 LSN_FORMAT_ARGS(errarg->finish_lsn));
6268 }
6269 else
6270 {
6271 if (!XLogRecPtrIsValid(errarg->finish_lsn))
6272 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6273 errarg->origin_name,
6275 errarg->rel->remoterel.nspname,
6276 errarg->rel->remoterel.relname,
6277 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6278 errarg->remote_xid);
6279 else
6280 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6281 errarg->origin_name,
6283 errarg->rel->remoterel.nspname,
6284 errarg->rel->remoterel.relname,
6285 errarg->rel->remoterel.attnames[errarg->remote_attnum],
6286 errarg->remote_xid,
6287 LSN_FORMAT_ARGS(errarg->finish_lsn));
6288 }
6289 }
6290}
#define Assert(condition)
Definition c.h:943
#define errcontext
Definition elog.h:200
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition proto.c:1212
#define TransactionIdIsValid(xid)
Definition transam.h:41
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References apply_error_callback_arg, Assert, ApplyErrorCallbackArg::command, errcontext, fb(), logicalrep_message_type(), LSN_FORMAT_ARGS, TransactionIdIsValid, and XLogRecPtrIsValid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 1218 of file worker.c.

1219{
1221
1222 /* There must not be an active streaming transaction. */
1224
1227
1228 remote_final_lsn = begin_data.final_lsn;
1229
1231
1232 in_remote_transaction = true;
1233
1235}
bool in_remote_transaction
Definition worker.c:489
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition worker.c:6294
static XLogRecPtr remote_final_lsn
Definition worker.c:490
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition worker.c:6084
static TransactionId stream_xid
Definition worker.c:495
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition proto.c:63

References Assert, fb(), in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, and TransactionIdIsValid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1272 of file worker.c.

1273{
1275
1276 /* Tablesync should never receive prepare. */
1277 if (am_tablesync_worker())
1278 ereport(ERROR,
1280 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1281
1282 /* There must not be an active streaming transaction. */
1284
1287
1288 remote_final_lsn = begin_data.prepare_lsn;
1289
1291
1292 in_remote_transaction = true;
1293
1295}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition proto.c:134
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, and TransactionIdIsValid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1243 of file worker.c.

1244{
1246
1248
1249 if (commit_data.commit_lsn != remote_final_lsn)
1250 ereport(ERROR,
1252 errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1253 LSN_FORMAT_ARGS(commit_data.commit_lsn),
1255
1257
1258 /*
1259 * Process any tables that are being synchronized in parallel, as well as
1260 * any newly added tables or sequences.
1261 */
1263
1266}
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition worker.c:2510
static void reset_apply_error_context_info(void)
Definition worker.c:6302
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:98
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156

References apply_handle_commit_internal(), ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), ProcessSyncingRelations(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2510 of file worker.c.

2511{
2512 if (is_skipping_changes())
2513 {
2515
2516 /*
2517 * Start a new transaction to clear the subskiplsn, if not started
2518 * yet.
2519 */
2520 if (!IsTransactionState())
2522 }
2523
2524 if (IsTransactionState())
2525 {
2526 /*
2527 * The transaction is either non-empty or skipped, so we clear the
2528 * subskiplsn.
2529 */
2531
2532 /*
2533 * Update origin state so we can restart streaming from correct
2534 * position in case of crash.
2535 */
2538
2540
2541 if (IsTransactionBlock())
2542 {
2543 EndTransactionBlock(false);
2545 }
2546
2547 pgstat_report_stat(false);
2548
2550 }
2551 else
2552 {
2553 /* Process any invalidation messages that might have accumulated. */
2556 }
2557
2558 in_remote_transaction = false;
2559}
static void stop_skipping_changes(void)
Definition worker.c:6111
#define is_skipping_changes()
Definition worker.c:522
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition worker.c:6133
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition worker.c:3946
void maybe_reread_subscription(void)
Definition worker.c:5045
void AcceptInvalidationMessages(void)
Definition inval.c:930
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
long pgstat_report_stat(bool force)
Definition pgstat.c:722
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
bool IsTransactionState(void)
Definition xact.c:389
void StartTransactionCommand(void)
Definition xact.c:3109
bool IsTransactionBlock(void)
Definition xact.c:5022
void CommitTransactionCommand(void)
Definition xact.c:3207
bool EndTransactionBlock(bool chain)
Definition xact.c:4095
XLogRecPtr XactLastCommitEnd
Definition xlog.c:262

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), CommitTransactionCommand(), EndTransactionBlock(), fb(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, pgstat_report_stat(), replorigin_xact_state, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1412 of file worker.c.

1413{
1415 char gid[GIDSIZE];
1416
1419
1420 /* Compute GID for two_phase transactions. */
1422 gid, sizeof(gid));
1423
1424 /* There is no transaction when COMMIT PREPARED is called */
1426
1427 /*
1428 * Update origin state so we can restart streaming from correct position
1429 * in case of crash.
1430 */
1433
1434 FinishPreparedTransaction(gid, true);
1437 pgstat_report_stat(false);
1438
1440 in_remote_transaction = false;
1441
1442 /*
1443 * Process any tables that are being synchronized in parallel, as well as
1444 * any newly added tables or sequences.
1445 */
1447
1449
1452}
static void begin_replication_step(void)
Definition worker.c:733
static void end_replication_step(void)
Definition worker.c:756
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition proto.c:267
void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
Definition twophase.c:2753
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition twophase.c:1503
#define GIDSIZE
Definition xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), fb(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, pgstat_report_activity(), pgstat_report_stat(), ProcessSyncingRelations(), replorigin_xact_state, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 3019 of file worker.c.

3020{
3023 LogicalRepRelId relid;
3026 EState *estate;
3029 bool run_as_owner;
3030
3031 /*
3032 * Quick return if we are skipping data modification changes or handling
3033 * streamed transactions.
3034 */
3035 if (is_skipping_changes() ||
3037 return;
3038
3040
3041 relid = logicalrep_read_delete(s, &oldtup);
3044 {
3045 /*
3046 * The relation can't become interesting in the middle of the
3047 * transaction so it's safe to unlock it.
3048 */
3051 return;
3052 }
3053
3054 /* Set relation for error callback */
3056
3057 /* Check if we can do the delete. */
3059
3060 /*
3061 * Make sure that any user-supplied code runs as the table owner, unless
3062 * the user has opted out of that behavior.
3063 */
3065 if (!run_as_owner)
3066 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3067
3068 /* Initialize the executor state. */
3070 estate = edata->estate;
3073 &TTSOpsVirtual);
3074
3075 /* Build the search tuple. */
3079
3080 /* For a partitioned table, apply delete to correct partition. */
3081 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3084 else
3085 {
3086 ResultRelInfo *relinfo = edata->targetRelInfo;
3087
3088 ExecOpenIndices(relinfo, false);
3092 }
3093
3095
3096 /* Reset relation for error callback */
3098
3099 if (!run_as_owner)
3101
3103
3105}
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition worker.c:2756
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition worker.c:877
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition worker.c:688
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition worker.c:784
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition worker.c:3358
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1024
static void finish_edata(ApplyExecutionData *edata)
Definition worker.c:935
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition worker.c:3113
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
#define GetPerTupleMemoryContext(estate)
Definition executor.h:672
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
uint32 LogicalRepRelId
@ CMD_DELETE
Definition nodes.h:278
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition proto.c:561
#define RelationGetDescr(relation)
Definition rel.h:542
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
LogicalRepRelMapEntry * rel
Definition worker.c:331
Form_pg_class rd_rel
Definition rel.h:111
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), fb(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 3113 of file worker.c.

3117{
3118 EState *estate = edata->estate;
3119 Relation localrel = relinfo->ri_RelationDesc;
3120 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3121 EPQState epqstate;
3124 bool found;
3125
3126 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3127
3128 /* Caller should have opened indexes already. */
3129 Assert(relinfo->ri_IndexRelationDescs != NULL ||
3130 !localrel->rd_rel->relhasindex ||
3131 RelationGetIndexList(localrel) == NIL);
3132
3133 found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3135
3136 /* If found delete it. */
3137 if (found)
3138 {
3139 /*
3140 * Report the conflict if the tuple was modified by a different
3141 * origin.
3142 */
3144 &conflicttuple.origin, &conflicttuple.ts) &&
3146 {
3147 conflicttuple.slot = localslot;
3151 }
3152
3153 EvalPlanQualSetSlot(&epqstate, localslot);
3154
3155 /* Do the actual delete. */
3156 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3157 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3158 }
3159 else
3160 {
3161 /*
3162 * The tuple to be deleted could not be found. Do nothing except for
3163 * emitting a log message.
3164 */
3167 }
3168
3169 /* Cleanup. */
3170 EvalPlanQualEnd(&epqstate);
3171}
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition worker.c:3181
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition worker.c:2608
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:64
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:105
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
#define LOG
Definition elog.h:32
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam, List *resultRelations)
Definition execMain.c:2747
void EvalPlanQualEnd(EPQState *epqstate)
Definition execMain.c:3208
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition executor.h:290
#define ACL_DELETE
Definition parsenodes.h:79
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
List * RelationGetIndexList(Relation relation)
Definition relcache.c:4827
ReplOriginId origin
Definition origin.h:45

References ACL_DELETE, Assert, CT_DELETE_MISSING, CT_DELETE_ORIGIN_DIFFERS, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecSimpleRelationDelete(), fb(), FindReplTupleInLocalRel(), GetTupleTransactionInfo(), list_make1, LOG, NIL, ReplOriginXactState::origin, RelationData::rd_rel, RelationGetIndexList(), replorigin_xact_state, ReportApplyConflict(), and TargetPrivilegesCheck().

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2640 of file worker.c.

2641{
2644 LogicalRepRelId relid;
2647 EState *estate;
2650 bool run_as_owner;
2651
2652 /*
2653 * Quick return if we are skipping data modification changes or handling
2654 * streamed transactions.
2655 */
2656 if (is_skipping_changes() ||
2658 return;
2659
2661
2662 relid = logicalrep_read_insert(s, &newtup);
2665 {
2666 /*
2667 * The relation can't become interesting in the middle of the
2668 * transaction so it's safe to unlock it.
2669 */
2672 return;
2673 }
2674
2675 /*
2676 * Make sure that any user-supplied code runs as the table owner, unless
2677 * the user has opted out of that behavior.
2678 */
2680 if (!run_as_owner)
2681 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2682
2683 /* Set relation for error callback */
2685
2686 /* Initialize the executor state. */
2688 estate = edata->estate;
2691 &TTSOpsVirtual);
2692
2693 /* Process and store remote tuple in the slot */
2696 slot_fill_defaults(rel, estate, remoteslot);
2698
2699 /* For a partitioned table, insert the tuple into a partition. */
2700 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2703 else
2704 {
2705 ResultRelInfo *relinfo = edata->targetRelInfo;
2706
2707 ExecOpenIndices(relinfo, false);
2710 }
2711
2713
2714 /* Reset relation for error callback */
2716
2717 if (!run_as_owner)
2719
2721
2723}
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition worker.c:2731
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition worker.c:966
@ CMD_INSERT
Definition nodes.h:277
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition proto.c:428

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ExecCloseIndices(), ExecInitExtraTupleSlot(), ExecOpenIndices(), fb(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), SwitchToUntrustedUser(), and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2731 of file worker.c.

2734{
2735 EState *estate = edata->estate;
2736
2737 /* Caller should have opened indexes already. */
2738 Assert(relinfo->ri_IndexRelationDescs != NULL ||
2739 !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2740 RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2741
2742 /* Caller will not have done this bit. */
2743 Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2745
2746 /* Do the insert. */
2747 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2749}
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:140
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition parsenodes.h:76

References ACL_INSERT, Assert, ExecSimpleRelationInsert(), fb(), InitConflictIndexes(), NIL, RelationGetIndexList(), and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1673 of file worker.c.

1674{
1675 /*
1676 * ORIGIN message can only come inside streaming transaction or inside
1677 * remote transaction and before any actual writes.
1678 */
1682 ereport(ERROR,
1684 errmsg_internal("ORIGIN message sent out of order")));
1685}
static bool in_streamed_transaction
Definition worker.c:493

References am_tablesync_worker(), ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1338 of file worker.c.

1339{
1341
1343
1344 if (prepare_data.prepare_lsn != remote_final_lsn)
1345 ereport(ERROR,
1347 errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1348 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1350
1351 /*
1352 * Unlike commit, here, we always prepare the transaction even though no
1353 * change has happened in this transaction or all changes are skipped. It
1354 * is done this way because at commit prepared time, we won't know whether
1355 * we have skipped preparing a transaction because of those reasons.
1356 *
1357 * XXX, We can optimize such that at commit prepared time, we first check
1358 * whether we have prepared the transaction or not but that doesn't seem
1359 * worthwhile because such cases shouldn't be common.
1360 */
1362
1364
1367 pgstat_report_stat(false);
1368
1369 /*
1370 * It is okay not to set the local_end LSN for the prepare because we
1371 * always flush the prepare record. So, we can send the acknowledgment of
1372 * the remote_end LSN as soon as prepare is finished.
1373 *
1374 * XXX For the sake of consistency with commit, we could have set it with
1375 * the LSN of prepare but as of now we don't track that value similar to
1376 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1377 * it.
1378 */
1380
1381 in_remote_transaction = false;
1382
1383 /*
1384 * Process any tables that are being synchronized in parallel, as well as
1385 * any newly added tables or sequences.
1386 */
1388
1389 /*
1390 * Since we have already prepared the transaction, in a case where the
1391 * server crashes before clearing the subskiplsn, it will be left but the
1392 * transaction won't be resent. But that's okay because it's a rare case
1393 * and the subskiplsn will be cleared when finishing the next transaction.
1394 */
1397
1400}
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition worker.c:1301
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:228
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), ProcessSyncingRelations(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1301 of file worker.c.

1302{
1303 char gid[GIDSIZE];
1304
1305 /*
1306 * Compute unique GID for two_phase transactions. We don't use GID of
1307 * prepared transaction sent by server as that can lead to deadlock when
1308 * we have multiple subscriptions from same node point to publications on
1309 * the same node. See comments atop worker.c
1310 */
1312 gid, sizeof(gid));
1313
1314 /*
1315 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1316 * called within the PrepareTransactionBlock below.
1317 */
1318 if (!IsTransactionBlock())
1319 {
1321 CommitTransactionCommand(); /* Completes the preceding Begin command. */
1322 }
1323
1324 /*
1325 * Update origin state so we can restart streaming from correct position
1326 * in case of crash.
1327 */
1330
1332}
bool PrepareTransactionBlock(const char *gid)
Definition xact.c:4043
void BeginTransactionBlock(void)
Definition xact.c:3975

References BeginTransactionBlock(), CommitTransactionCommand(), fb(), GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, PrepareTransactionBlock(), replorigin_xact_state, and TwoPhaseTransactionGid().

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2570 of file worker.c.

2571{
2572 LogicalRepRelation *rel;
2573
2575 return;
2576
2577 rel = logicalrep_read_rel(s);
2579
2580 /* Also reset all entries in the partition map that refer to remoterel. */
2582}
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition proto.c:698
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition relation.c:585
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1464 of file worker.c.

1465{
1467 char gid[GIDSIZE];
1468
1471
1472 /* Compute GID for two_phase transactions. */
1474 gid, sizeof(gid));
1475
1476 /*
1477 * It is possible that we haven't received prepare because it occurred
1478 * before walsender reached a consistent point or the two_phase was still
1479 * not enabled by that time, so in such cases, we need to skip rollback
1480 * prepared.
1481 */
1482 if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1483 rollback_data.prepare_time))
1484 {
1485 /*
1486 * Update origin state so we can restart streaming from correct
1487 * position in case of crash.
1488 */
1491
1492 /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1494 FinishPreparedTransaction(gid, false);
1497
1499 }
1500
1501 pgstat_report_stat(false);
1502
1503 /*
1504 * It is okay not to set the local_end LSN for the rollback of prepared
1505 * transaction because we always flush the WAL record for it. See
1506 * apply_handle_prepare.
1507 */
1509 in_remote_transaction = false;
1510
1511 /*
1512 * Process any tables that are being synchronized in parallel, as well as
1513 * any newly added tables or sequences.
1514 */
1515 ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1516
1519}
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition proto.c:325
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition twophase.c:2694

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), fb(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, InvalidXLogRecPtr, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, ReplOriginXactState::origin_lsn, ReplOriginXactState::origin_timestamp, pgstat_report_activity(), pgstat_report_stat(), ProcessSyncingRelations(), replorigin_xact_state, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), and TwoPhaseTransactionGid().

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 2078 of file worker.c.

2079{
2080 TransactionId xid;
2081 TransactionId subxid;
2085
2086 /* Save the message before it is consumed. */
2088 bool toplevel_xact;
2089
2091 ereport(ERROR,
2093 errmsg_internal("STREAM ABORT message without STREAM STOP")));
2094
2095 /* We receive abort information only when we can apply in parallel. */
2098
2099 xid = abort_data.xid;
2100 subxid = abort_data.subxid;
2101 toplevel_xact = (xid == subxid);
2102
2103 set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2104
2106
2107 switch (apply_action)
2108 {
2109 case TRANS_LEADER_APPLY:
2110
2111 /*
2112 * We are in the leader apply worker and the transaction has been
2113 * serialized to file.
2114 */
2115 stream_abort_internal(xid, subxid);
2116
2117 elog(DEBUG1, "finished processing the STREAM ABORT command");
2118 break;
2119
2121 Assert(winfo);
2122
2123 /*
2124 * For the case of aborting the subtransaction, we increment the
2125 * number of streaming blocks and take the lock again before
2126 * sending the STREAM_ABORT to ensure that the parallel apply
2127 * worker will wait on the lock for the next set of changes after
2128 * processing the STREAM_ABORT message if it is not already
2129 * waiting for STREAM_STOP message.
2130 *
2131 * It is important to perform this locking before sending the
2132 * STREAM_ABORT message so that the leader can hold the lock first
2133 * and the parallel apply worker will wait for the leader to
2134 * release the lock. This is the same as what we do in
2135 * apply_handle_stream_stop. See Locking Considerations atop
2136 * applyparallelworker.c.
2137 */
2138 if (!toplevel_xact)
2139 {
2143 }
2144
2145 if (pa_send_data(winfo, s->len, s->data))
2146 {
2147 /*
2148 * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2149 * wait here for the parallel apply worker to finish as that
2150 * is not required to maintain the commit order and won't have
2151 * the risk of failures due to transaction dependencies and
2152 * deadlocks. However, it is possible that before the parallel
2153 * worker finishes and we clear the worker info, the xid
2154 * wraparound happens on the upstream and a new transaction
2155 * with the same xid can appear and that can lead to duplicate
2156 * entries in ParallelApplyTxnHash. Yet another problem could
2157 * be that we may have serialized the changes in partial
2158 * serialize mode and the file containing xact changes may
2159 * already exist, and after xid wraparound trying to create
2160 * the file for the same xid can lead to an error. To avoid
2161 * these problems, we decide to wait for the aborts to finish.
2162 *
2163 * Note, it is okay to not update the flush location position
2164 * for aborts as in worst case that means such a transaction
2165 * won't be sent again after restart.
2166 */
2167 if (toplevel_xact)
2169
2170 break;
2171 }
2172
2173 /*
2174 * Switch to serialize mode when we are not able to send the
2175 * change to parallel apply worker.
2176 */
2177 pa_switch_to_partial_serialize(winfo, true);
2178
2181 Assert(winfo);
2182
2183 /*
2184 * Parallel apply worker might have applied some changes, so write
2185 * the STREAM_ABORT message so that it can rollback the
2186 * subtransaction if needed.
2187 */
2189 &original_msg);
2190
2191 if (toplevel_xact)
2192 {
2195 }
2196 break;
2197
2199
2200 /*
2201 * If the parallel apply worker is applying spooled messages then
2202 * close the file before aborting.
2203 */
2204 if (toplevel_xact && stream_fd)
2206
2208
2209 /*
2210 * We need to wait after processing rollback to savepoint for the
2211 * next set of changes.
2212 *
2213 * We have a race condition here due to which we can start waiting
2214 * here when there are more chunk of streams in the queue. See
2215 * apply_handle_stream_stop.
2216 */
2217 if (!toplevel_xact)
2219
2220 elog(DEBUG1, "finished processing the STREAM ABORT command");
2221 break;
2222
2223 default:
2224 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2225 break;
2226 }
2227
2229}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition atomics.h:424
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition worker.c:6379
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition worker.c:5541
static BufFile * stream_fd
Definition worker.c:525
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition worker.c:1995
static void stream_close_file(void)
Definition worker.c:5493
#define pg_fallthrough
Definition c.h:161
uint32 TransactionId
Definition c.h:736
#define DEBUG1
Definition elog.h:31
#define elog(elevel,...)
Definition elog.h:228
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
#define AccessExclusiveLock
Definition lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition proto.c:1187
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE

References AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pg_fallthrough, reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, and TRANS_PARALLEL_APPLY.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2397 of file worker.c.

2398{
2399 TransactionId xid;
2403
2404 /* Save the message before it is consumed. */
2406
2408 ereport(ERROR,
2410 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2411
2414
2416
2417 switch (apply_action)
2418 {
2419 case TRANS_LEADER_APPLY:
2420
2421 /*
2422 * The transaction has been serialized to file, so replay all the
2423 * spooled operations.
2424 */
2426 commit_data.commit_lsn);
2427
2429
2430 /* Unlink the files with serialized changes and subxact info. */
2432
2433 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2434 break;
2435
2437 Assert(winfo);
2438
2439 if (pa_send_data(winfo, s->len, s->data))
2440 {
2441 /* Finish processing the streaming transaction. */
2442 pa_xact_finish(winfo, commit_data.end_lsn);
2443 break;
2444 }
2445
2446 /*
2447 * Switch to serialize mode when we are not able to send the
2448 * change to parallel apply worker.
2449 */
2450 pa_switch_to_partial_serialize(winfo, true);
2451
2454 Assert(winfo);
2455
2457 &original_msg);
2458
2460
2461 /* Finish processing the streaming transaction. */
2462 pa_xact_finish(winfo, commit_data.end_lsn);
2463 break;
2464
2466
2467 /*
2468 * If the parallel apply worker is applying spooled messages then
2469 * close the file before committing.
2470 */
2471 if (stream_fd)
2473
2475
2477
2478 /*
2479 * It is important to set the transaction state as finished before
2480 * releasing the lock. See pa_wait_for_xact_finish.
2481 */
2484
2486
2487 elog(DEBUG1, "finished processing the STREAM COMMIT command");
2488 break;
2489
2490 default:
2491 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2492 break;
2493 }
2494
2495 /*
2496 * Process any tables that are being synchronized in parallel, as well as
2497 * any newly added tables or sequences.
2498 */
2500
2502
2504}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition worker.c:5424
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition worker.c:2267
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition proto.c:1132
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pg_fallthrough, pgstat_report_activity(), ProcessSyncingRelations(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1525 of file worker.c.

1526{
1530
1531 /* Save the message before it is consumed. */
1533
1535 ereport(ERROR,
1537 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1538
1539 /* Tablesync should never receive prepare. */
1540 if (am_tablesync_worker())
1541 ereport(ERROR,
1543 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1544
1547
1549
1550 switch (apply_action)
1551 {
1552 case TRANS_LEADER_APPLY:
1553
1554 /*
1555 * The transaction has been serialized to file, so replay all the
1556 * spooled operations.
1557 */
1559 prepare_data.xid, prepare_data.prepare_lsn);
1560
1561 /* Mark the transaction as prepared. */
1563
1565
1566 /*
1567 * It is okay not to set the local_end LSN for the prepare because
1568 * we always flush the prepare record. See apply_handle_prepare.
1569 */
1571
1572 in_remote_transaction = false;
1573
1574 /* Unlink the files with serialized changes and subxact info. */
1576
1577 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1578 break;
1579
1581 Assert(winfo);
1582
1583 if (pa_send_data(winfo, s->len, s->data))
1584 {
1585 /* Finish processing the streaming transaction. */
1586 pa_xact_finish(winfo, prepare_data.end_lsn);
1587 break;
1588 }
1589
1590 /*
1591 * Switch to serialize mode when we are not able to send the
1592 * change to parallel apply worker.
1593 */
1594 pa_switch_to_partial_serialize(winfo, true);
1595
1598 Assert(winfo);
1599
1602 &original_msg);
1603
1605
1606 /* Finish processing the streaming transaction. */
1607 pa_xact_finish(winfo, prepare_data.end_lsn);
1608 break;
1609
1611
1612 /*
1613 * If the parallel apply worker is applying spooled messages then
1614 * close the file before preparing.
1615 */
1616 if (stream_fd)
1618
1620
1621 /* Mark the transaction as prepared. */
1623
1625
1627
1628 /*
1629 * It is okay not to set the local_end LSN for the prepare because
1630 * we always flush the prepare record. See apply_handle_prepare.
1631 */
1633
1636
1638
1639 elog(DEBUG1, "finished processing the STREAM PREPARE command");
1640 break;
1641
1642 default:
1643 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1644 break;
1645 }
1646
1647 pgstat_report_stat(false);
1648
1649 /*
1650 * Process any tables that are being synchronized in parallel, as well as
1651 * any newly added tables or sequences.
1652 */
1654
1655 /*
1656 * Similar to prepare case, the subskiplsn could be left in a case of
1657 * server crash but it's okay. See the comments in apply_handle_prepare().
1658 */
1661
1663
1665}
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition proto.c:365

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert, begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog, end_replication_step(), ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pg_fallthrough, pgstat_report_activity(), pgstat_report_stat(), ProcessSyncingRelations(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1732 of file worker.c.

1733{
1734 bool first_segment;
1737
1738 /* Save the message before it is consumed. */
1740
1742 ereport(ERROR,
1744 errmsg_internal("duplicate STREAM START message")));
1745
1746 /* There must not be an active streaming transaction. */
1748
1749 /* notify handle methods we're processing a remote transaction */
1751
1752 /* extract XID of the top-level transaction */
1754
1756 ereport(ERROR,
1758 errmsg_internal("invalid transaction ID in streamed replication transaction")));
1759
1761
1762 /* Try to allocate a worker for the streaming transaction. */
1763 if (first_segment)
1765
1767
1768 switch (apply_action)
1769 {
1771
1772 /*
1773 * Function stream_start_internal starts a transaction. This
1774 * transaction will be committed on the stream stop unless it is a
1775 * tablesync worker in which case it will be committed after
1776 * processing all the messages. We need this transaction for
1777 * handling the BufFile, used for serializing the streaming data
1778 * and subxact info.
1779 */
1781 break;
1782
1784 Assert(winfo);
1785
1786 /*
1787 * Once we start serializing the changes, the parallel apply
1788 * worker will wait for the leader to release the stream lock
1789 * until the end of the transaction. So, we don't need to release
1790 * the lock or increment the stream count in that case.
1791 */
1792 if (pa_send_data(winfo, s->len, s->data))
1793 {
1794 /*
1795 * Unlock the shared object lock so that the parallel apply
1796 * worker can continue to receive changes.
1797 */
1798 if (!first_segment)
1800
1801 /*
1802 * Increment the number of streaming blocks waiting to be
1803 * processed by parallel apply worker.
1804 */
1806
1807 /* Cache the parallel apply worker for this transaction. */
1809 break;
1810 }
1811
1812 /*
1813 * Switch to serialize mode when we are not able to send the
1814 * change to parallel apply worker.
1815 */
1817
1820 Assert(winfo);
1821
1822 /*
1823 * Open the spool file unless it was already opened when switching
1824 * to serialize mode. The transaction started in
1825 * stream_start_internal will be committed on the stream stop.
1826 */
1829
1831
1832 /* Cache the parallel apply worker for this transaction. */
1834 break;
1835
1837 if (first_segment)
1838 {
1839 /* Hold the lock until the end of the transaction. */
1842
1843 /*
1844 * Signal the leader apply worker, as it may be waiting for
1845 * us.
1846 */
1849 }
1850
1852 break;
1853
1854 default:
1855 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1856 break;
1857 }
1858
1860}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition worker.c:501
static void stream_write_change(char action, StringInfo s)
Definition worker.c:5511
void stream_start_internal(TransactionId xid, bool first_segment)
Definition worker.c:1694
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:733
#define InvalidOid
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition proto.c:1082
@ PARALLEL_TRANS_STARTED
@ WORKERTYPE_APPLY

References AccessExclusiveLock, Assert, StringInfoData::data, elog, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pg_fallthrough, pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, WORKERTYPE_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1892 of file worker.c.

1893{
1896
1898 ereport(ERROR,
1900 errmsg_internal("STREAM STOP message without STREAM START")));
1901
1903
1904 switch (apply_action)
1905 {
1908 break;
1909
1911 Assert(winfo);
1912
1913 /*
1914 * Lock before sending the STREAM_STOP message so that the leader
1915 * can hold the lock first and the parallel apply worker will wait
1916 * for leader to release the lock. See Locking Considerations atop
1917 * applyparallelworker.c.
1918 */
1920
1921 if (pa_send_data(winfo, s->len, s->data))
1922 {
1924 break;
1925 }
1926
1927 /*
1928 * Switch to serialize mode when we are not able to send the
1929 * change to parallel apply worker.
1930 */
1931 pa_switch_to_partial_serialize(winfo, true);
1932
1938 break;
1939
1941 elog(DEBUG1, "applied %u changes in the streaming chunk",
1943
1944 /*
1945 * By the time parallel apply worker is processing the changes in
1946 * the current streaming block, the leader apply worker may have
1947 * sent multiple streaming blocks. This can lead to parallel apply
1948 * worker start waiting even when there are more chunk of streams
1949 * in the queue. So, try to lock only if there is no message left
1950 * in the queue. See Locking Considerations atop
1951 * applyparallelworker.c.
1952 *
1953 * Note that here we have a race condition where we can start
1954 * waiting even when there are pending streaming chunks. This can
1955 * happen if the leader sends another streaming block and acquires
1956 * the stream lock again after the parallel apply worker checks
1957 * that there is no pending streaming block and before it actually
1958 * starts waiting on a lock. We can handle this case by not
1959 * allowing the leader to increment the stream block count during
1960 * the time parallel apply worker acquires the lock but it is not
1961 * clear whether that is worth the complexity.
1962 *
1963 * Now, if this missed chunk contains rollback to savepoint, then
1964 * there is a risk of deadlock which probably shouldn't happen
1965 * after restart.
1966 */
1968 break;
1969
1970 default:
1971 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1972 break;
1973 }
1974
1977
1978 /*
1979 * The parallel apply worker could be in a transaction in which case we
1980 * need to report the state as STATE_IDLEINTRANSACTION.
1981 */
1984 else
1986
1988}
void stream_stop_internal(TransactionId xid)
Definition worker.c:1869
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5040

References AccessExclusiveLock, Assert, StringInfoData::data, DEBUG1, elog, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pg_fallthrough, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3654 of file worker.c.

3655{
3656 bool cascade = false;
3657 bool restart_seqs = false;
3659 List *remote_rels = NIL;
3660 List *rels = NIL;
3661 List *part_rels = NIL;
3662 List *relids = NIL;
3664 ListCell *lc;
3665 LOCKMODE lockmode = AccessExclusiveLock;
3666
3667 /*
3668 * Quick return if we are skipping data modification changes or handling
3669 * streamed transactions.
3670 */
3671 if (is_skipping_changes() ||
3673 return;
3674
3676
3677 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3678
3679 foreach(lc, remote_relids)
3680 {
3681 LogicalRepRelId relid = lfirst_oid(lc);
3683
3684 rel = logicalrep_rel_open(relid, lockmode);
3686 {
3687 /*
3688 * The relation can't become interesting in the middle of the
3689 * transaction so it's safe to unlock it.
3690 */
3691 logicalrep_rel_close(rel, lockmode);
3692 continue;
3693 }
3694
3697 rels = lappend(rels, rel->localrel);
3698 relids = lappend_oid(relids, rel->localreloid);
3701
3702 /*
3703 * Truncate partitions if we got a message to truncate a partitioned
3704 * table.
3705 */
3706 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3707 {
3708 ListCell *child;
3709 List *children = find_all_inheritors(rel->localreloid,
3710 lockmode,
3711 NULL);
3712
3713 foreach(child, children)
3714 {
3715 Oid childrelid = lfirst_oid(child);
3717
3718 if (list_member_oid(relids, childrelid))
3719 continue;
3720
3721 /* find_all_inheritors already got lock */
3723
3724 /*
3725 * Ignore temp tables of other backends. See similar code in
3726 * ExecuteTruncate().
3727 */
3729 {
3730 table_close(childrel, lockmode);
3731 continue;
3732 }
3733
3735 rels = lappend(rels, childrel);
3737 relids = lappend_oid(relids, childrelid);
3738 /* Log this relation only if needed for logical decoding */
3741 }
3742 }
3743 }
3744
3745 /*
3746 * Even if we used CASCADE on the upstream primary we explicitly default
3747 * to replaying changes without further cascading. This might be later
3748 * changeable with a user specified option.
3749 *
3750 * MySubscription->runasowner tells us whether we want to execute
3751 * replication actions as the subscription owner; the last argument to
3752 * TruncateGuts tells it whether we want to switch to the table owner.
3753 * Those are exactly opposite conditions.
3754 */
3756 relids,
3759 restart_seqs,
3761 foreach(lc, remote_rels)
3762 {
3764
3766 }
3767 foreach(lc, part_rels)
3768 {
3769 Relation rel = lfirst(lc);
3770
3771 table_close(rel, NoLock);
3772 }
3773
3775}
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
int LOCKMODE
Definition lockdefs.h:26
@ DROP_RESTRICT
#define ACL_TRUNCATE
Definition parsenodes.h:80
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define lfirst_oid(lc)
Definition pg_list.h:174
unsigned int Oid
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition proto.c:615
#define RelationIsLogicallyLogged(relation)
Definition rel.h:712
#define RELATION_IS_OTHER_TEMP(relation)
Definition rel.h:669
Definition pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs, bool run_as_table_owner)
Definition tablecmds.c:2014

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), fb(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), MySubscription, NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, Subscription::runasowner, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 3358 of file worker.c.

3362{
3363 EState *estate = edata->estate;
3364 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3365 ResultRelInfo *relinfo = edata->targetRelInfo;
3366 Relation parentrel = relinfo->ri_RelationDesc;
3367 ModifyTableState *mtstate;
3368 PartitionTupleRouting *proute;
3370 Relation partrel;
3372 TupleConversionMap *map;
3375 AttrMap *attrmap = NULL;
3376
3377 /* ModifyTableState is needed for ExecFindPartition(). */
3378 edata->mtstate = mtstate = makeNode(ModifyTableState);
3379 mtstate->ps.plan = NULL;
3380 mtstate->ps.state = estate;
3381 mtstate->operation = operation;
3382 mtstate->resultRelInfo = relinfo;
3383
3384 /* ... as is PartitionTupleRouting. */
3385 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3386
3387 /*
3388 * Find the partition to which the "search tuple" belongs.
3389 */
3392 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3393 remoteslot, estate);
3395 partrel = partrelinfo->ri_RelationDesc;
3396
3397 /*
3398 * Check for supported relkind. We need this since partitions might be of
3399 * unsupported relkinds; and the set of partitions can change, so checking
3400 * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3401 */
3402 CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3403 relmapentry->remoterel.relkind,
3405 RelationGetRelationName(partrel));
3406
3407 /*
3408 * To perform any of the operations below, the tuple must match the
3409 * partition's rowtype. Convert if needed or just copy, using a dedicated
3410 * slot to store the tuple in any case.
3411 */
3412 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3413 if (remoteslot_part == NULL)
3414 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3415 map = ExecGetRootToChildMap(partrelinfo, estate);
3416 if (map != NULL)
3417 {
3418 attrmap = map->attrMap;
3421 }
3422 else
3423 {
3426 }
3428
3429 /* Check if we can do the update or delete on the leaf partition. */
3431 {
3432 part_entry = logicalrep_partition_open(relmapentry, partrel,
3433 attrmap);
3435 }
3436
3437 switch (operation)
3438 {
3439 case CMD_INSERT:
3442 break;
3443
3444 case CMD_DELETE:
3447 part_entry->localindexoid);
3448 break;
3449
3450 case CMD_UPDATE:
3451
3452 /*
3453 * For UPDATE, depending on whether or not the updated tuple
3454 * satisfies the partition's constraint, perform a simple UPDATE
3455 * of the partition or move the updated tuple into a different
3456 * suitable partition.
3457 */
3458 {
3462 bool found;
3463 EPQState epqstate;
3465
3466 /* Get the matching local tuple from the partition. */
3467 found = FindReplTupleInLocalRel(edata, partrel,
3468 &part_entry->remoterel,
3469 part_entry->localindexoid,
3471 if (!found)
3472 {
3475
3476 /*
3477 * Detecting whether the tuple was recently deleted or
3478 * never existed is crucial to avoid misleading the user
3479 * during conflict handling.
3480 */
3481 if (FindDeletedTupleInLocalRel(partrel,
3482 part_entry->localindexoid,
3484 &conflicttuple.xmin,
3485 &conflicttuple.origin,
3486 &conflicttuple.ts) &&
3489 else
3491
3492 /* Store the new tuple for conflict reporting */
3494
3495 /*
3496 * The tuple to be updated could not be found or was
3497 * deleted. Do nothing except for emitting a log message.
3498 */
3502
3503 return;
3504 }
3505
3506 /*
3507 * Report the conflict if the tuple was modified by a
3508 * different origin.
3509 */
3511 &conflicttuple.origin,
3512 &conflicttuple.ts) &&
3514 {
3516
3517 /* Store the new tuple for conflict reporting */
3518 newslot = table_slot_create(partrel, &estate->es_tupleTable);
3520
3521 conflicttuple.slot = localslot;
3522
3526 }
3527
3528 /*
3529 * Apply the update to the local tuple, putting the result in
3530 * remoteslot_part.
3531 */
3534 newtup);
3536
3537 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3538
3539 /*
3540 * Does the updated tuple still satisfy the current
3541 * partition's constraint?
3542 */
3543 if (!partrel->rd_rel->relispartition ||
3545 false))
3546 {
3547 /*
3548 * Yes, so simply UPDATE the partition. We don't call
3549 * apply_handle_update_internal() here, which would
3550 * normally do the following work, to avoid repeating some
3551 * work already done above to find the local tuple in the
3552 * partition.
3553 */
3555
3557 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3558 ACL_UPDATE);
3559 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3561 }
3562 else
3563 {
3564 /* Move the tuple into the new partition. */
3565
3566 /*
3567 * New partition will be found using tuple routing, which
3568 * can only occur via the parent table. We might need to
3569 * convert the tuple to the parent's rowtype. Note that
3570 * this is the tuple found in the partition, not the
3571 * original search tuple received by this function.
3572 */
3573 if (map)
3574 {
3578
3579 remoteslot =
3582 }
3583 else
3584 {
3587 }
3588
3589 /* Find the new partition. */
3592 proute, remoteslot,
3593 estate);
3596 partrel_new = partrelinfo_new->ri_RelationDesc;
3597
3598 /* Check that new partition also has supported relkind. */
3599 CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3600 relmapentry->remoterel.relkind,
3603
3604 /* DELETE old tuple found in the old partition. */
3605 EvalPlanQualSetSlot(&epqstate, localslot);
3606 TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3607 ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3608
3609 /* INSERT new tuple into the new partition. */
3610
3611 /*
3612 * Convert the replacement tuple to match the destination
3613 * partition rowtype.
3614 */
3616 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3617 if (remoteslot_part == NULL)
3619 &estate->es_tupleTable);
3621 if (map != NULL)
3622 {
3624 remoteslot,
3626 }
3627 else
3628 {
3630 remoteslot);
3632 }
3636 }
3637
3638 EvalPlanQualEnd(&epqstate);
3639 }
3640 break;
3641
3642 default:
3643 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3644 break;
3645 }
3646}
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition worker.c:1125
static bool FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TupleTableSlot *remoteslot, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
Definition worker.c:3276
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_UPDATE_MISSING
Definition conflict.h:46
static DataChecksumsWorkerOperation operation
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1885
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition execUtils.c:1352
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
@ CMD_UPDATE
Definition nodes.h:276
#define makeNode(_type_)
Definition nodes.h:161
#define ACL_UPDATE
Definition parsenodes.h:78
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition relation.c:647
List * es_tupleTable
Definition execnodes.h:748
LogicalRepRelation remoterel
ResultRelInfo * resultRelInfo
Definition execnodes.h:1446
Plan * plan
Definition execnodes.h:1201
EState * state
Definition execnodes.h:1203
AttrMap * attrMap
Definition tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition tupconvert.c:103
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition tupconvert.c:193
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:543
const char * type

References ACL_DELETE, ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert, TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, elog, ERROR, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationDelete(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), fb(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LOG, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), NIL, operation, ModifyTableState::operation, ReplOriginXactState::origin, PlanState::plan, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelMapEntry::remoterel, replorigin_xact_state, ReportApplyConflict(), ModifyTableState::resultRelInfo, slot_getallattrs(), slot_modify_data(), slot_store_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), and type.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2593 of file worker.c.

2594{
2596
2598 return;
2599
2601}
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition proto.c:757

References fb(), handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2797 of file worker.c.

2798{
2800 LogicalRepRelId relid;
2803 EState *estate;
2806 bool has_oldtup;
2810 bool run_as_owner;
2811
2812 /*
2813 * Quick return if we are skipping data modification changes or handling
2814 * streamed transactions.
2815 */
2816 if (is_skipping_changes() ||
2818 return;
2819
2821
2823 &newtup);
2826 {
2827 /*
2828 * The relation can't become interesting in the middle of the
2829 * transaction so it's safe to unlock it.
2830 */
2833 return;
2834 }
2835
2836 /* Set relation for error callback */
2838
2839 /* Check if we can do the update. */
2841
2842 /*
2843 * Make sure that any user-supplied code runs as the table owner, unless
2844 * the user has opted out of that behavior.
2845 */
2847 if (!run_as_owner)
2848 SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2849
2850 /* Initialize the executor state. */
2852 estate = edata->estate;
2855 &TTSOpsVirtual);
2856
2857 /*
2858 * Populate updatedCols so that per-column triggers can fire, and so
2859 * executor can correctly pass down indexUnchanged hint. This could
2860 * include more columns than were actually changed on the publisher
2861 * because the logical replication protocol doesn't contain that
2862 * information. But it would for example exclude columns that only exist
2863 * on the subscriber, since we are not touching those.
2864 */
2866 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2867 {
2868 CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2869 int remoteattnum = rel->attrmap->attnums[i];
2870
2871 if (!att->attisdropped && remoteattnum >= 0)
2872 {
2873 Assert(remoteattnum < newtup.ncols);
2875 target_perminfo->updatedCols =
2876 bms_add_member(target_perminfo->updatedCols,
2878 }
2879 }
2880
2881 /* Build the search tuple. */
2884 has_oldtup ? &oldtup : &newtup);
2886
2887 /* For a partitioned table, apply update to correct partition. */
2888 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2891 else
2894
2896
2897 /* Reset relation for error callback */
2899
2900 if (!run_as_owner)
2902
2904
2906}
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition worker.c:2914
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
int i
Definition isn.c:77
#define LOGICALREP_COLUMN_UNCHANGED
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition proto.c:487
AttrNumber * attnums
Definition attmap.h:36
bool attisdropped
Definition tupdesc.h:78
List * es_rteperminfos
Definition execnodes.h:704
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:195

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert, CompactAttribute::attisdropped, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ExecInitExtraTupleSlot(), fb(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), MySubscription, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RestoreUserContext(), RowExclusiveLock, Subscription::runasowner, should_apply_changes_for_rel(), slot_store_data(), SwitchToUntrustedUser(), TTSOpsVirtual, and TupleDescCompactAttr().

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2914 of file worker.c.

2919{
2920 EState *estate = edata->estate;
2921 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2922 Relation localrel = relinfo->ri_RelationDesc;
2923 EPQState epqstate;
2926 bool found;
2928
2929 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2930 ExecOpenIndices(relinfo, false);
2931
2932 found = FindReplTupleInLocalRel(edata, localrel,
2933 &relmapentry->remoterel,
2934 localindexoid,
2936
2937 /*
2938 * Tuple found.
2939 *
2940 * Note this will fail if there are other conflicting unique indexes.
2941 */
2942 if (found)
2943 {
2944 /*
2945 * Report the conflict if the tuple was modified by a different
2946 * origin.
2947 */
2949 &conflicttuple.origin, &conflicttuple.ts) &&
2951 {
2953
2954 /* Store the new tuple for conflict reporting */
2955 newslot = table_slot_create(localrel, &estate->es_tupleTable);
2956 slot_store_data(newslot, relmapentry, newtup);
2957
2958 conflicttuple.slot = localslot;
2959
2963 }
2964
2965 /* Process and store remote tuple in the slot */
2969
2970 EvalPlanQualSetSlot(&epqstate, remoteslot);
2971
2973
2974 /* Do the actual update. */
2975 TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2976 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2977 remoteslot);
2978 }
2979 else
2980 {
2983
2984 /*
2985 * Detecting whether the tuple was recently deleted or never existed
2986 * is crucial to avoid misleading the user during conflict handling.
2987 */
2988 if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2989 &conflicttuple.xmin,
2990 &conflicttuple.origin,
2991 &conflicttuple.ts) &&
2994 else
2996
2997 /* Store the new tuple for conflict reporting */
2998 slot_store_data(newslot, relmapentry, newtup);
2999
3000 /*
3001 * The tuple to be updated could not be found or was deleted. Do
3002 * nothing except for emitting a log message.
3003 */
3006 }
3007
3008 /* Cleanup. */
3010 EvalPlanQualEnd(&epqstate);
3011}

References ACL_UPDATE, CT_UPDATE_DELETED, CT_UPDATE_MISSING, CT_UPDATE_ORIGIN_DIFFERS, EState::es_tupleTable, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), fb(), FindDeletedTupleInLocalRel(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, GetTupleTransactionInfo(), InitConflictIndexes(), list_make1, LOG, MemoryContextSwitchTo(), NIL, ReplOriginXactState::origin, LogicalRepRelMapEntry::remoterel, replorigin_xact_state, ReportApplyConflict(), slot_modify_data(), slot_store_data(), table_slot_create(), TargetPrivilegesCheck(), and type.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2267 of file worker.c.

2269{
2270 int nchanges;
2271 char path[MAXPGPATH];
2272 char *buffer = NULL;
2274 ResourceOwner oldowner;
2275 int fileno;
2276 pgoff_t offset;
2277
2280
2281 /* Make sure we have an open transaction */
2283
2284 /*
2285 * Allocate file handle and memory required to process all the messages in
2286 * TopTransactionContext to avoid them getting reset after each message is
2287 * processed.
2288 */
2290
2291 /* Open the spool file for the committed/prepared transaction */
2293 elog(DEBUG1, "replaying changes from file \"%s\"", path);
2294
2295 /*
2296 * Make sure the file is owned by the toplevel transaction so that the
2297 * file will not be accidentally closed when aborting a subtransaction.
2298 */
2299 oldowner = CurrentResourceOwner;
2301
2302 stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2303
2304 CurrentResourceOwner = oldowner;
2305
2306 buffer = palloc(BLCKSZ);
2307
2309
2310 remote_final_lsn = lsn;
2311
2312 /*
2313 * Make sure the handle apply_dispatch methods are aware we're in a remote
2314 * transaction.
2315 */
2316 in_remote_transaction = true;
2318
2320
2321 /*
2322 * Read the entries one by one and pass them through the same logic as in
2323 * apply_dispatch.
2324 */
2325 nchanges = 0;
2326 while (true)
2327 {
2329 size_t nbytes;
2330 int len;
2331
2333
2334 /* read length of the on-disk record */
2335 nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2336
2337 /* have we reached end of the file? */
2338 if (nbytes == 0)
2339 break;
2340
2341 /* do we have a correct length? */
2342 if (len <= 0)
2343 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2344 len, path);
2345
2346 /* make sure we have sufficiently large buffer */
2347 buffer = repalloc(buffer, len);
2348
2349 /* and finally read the data into the buffer */
2350 BufFileReadExact(stream_fd, buffer, len);
2351
2352 BufFileTell(stream_fd, &fileno, &offset);
2353
2354 /* init a stringinfo using the buffer and call apply_dispatch */
2355 initReadOnlyStringInfo(&s2, buffer, len);
2356
2357 /* Ensure we are reading the data into our memory context. */
2359
2361
2363
2365
2366 nchanges++;
2367
2368 /*
2369 * It is possible the file has been closed because we have processed
2370 * the transaction end message like stream_commit in which case that
2371 * must be the last message.
2372 */
2373 if (!stream_fd)
2374 {
2375 ensure_last_message(stream_fileset, xid, fileno, offset);
2376 break;
2377 }
2378
2379 if (nchanges % 1000 == 0)
2380 elog(DEBUG1, "replayed %d changes from file \"%s\"",
2381 nchanges, path);
2382 }
2383
2384 if (stream_fd)
2386
2387 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2388 nchanges, path);
2389
2390 return;
2391}
MemoryContext ApplyMessageContext
Definition worker.c:476
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5410
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, pgoff_t offset)
Definition worker.c:2235
void apply_dispatch(StringInfo s)
Definition worker.c:3782
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition buffile.c:292
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition buffile.c:655
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition buffile.c:665
void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset)
Definition buffile.c:833
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopTransactionContext
Definition mcxt.c:171
void * repalloc(void *pointer, Size size)
Definition mcxt.c:1632
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define MAXPGPATH
const void size_t len
off_t pgoff_t
Definition port.h:421
char * s2
ResourceOwner TopTransactionResourceOwner
Definition resowner.c:175
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition stringinfo.h:157
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog, end_replication_step(), ensure_last_message(), ERROR, fb(), in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 5011 of file worker.c.

5012{
5014 {
5015 /*
5016 * Don't stop the parallel apply worker as the leader will detect the
5017 * subscription parameter change and restart logical replication later
5018 * anyway. This also prevents the leader from reporting errors when
5019 * trying to communicate with a stopped parallel apply worker, which
5020 * would accidentally disable subscriptions if disable_on_error was
5021 * set.
5022 */
5023 return;
5024 }
5025
5026 /*
5027 * Reset the last-start time for this apply worker so that the launcher
5028 * will restart it without waiting for wal_retrieve_retry_interval if the
5029 * subscription is still active, and so that we won't leak that hash table
5030 * entry if it isn't.
5031 */
5034
5035 proc_exit(0);
5036}
void proc_exit(int code)
Definition ipc.c:105
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), am_parallel_apply_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeLogRepWorker(), maybe_reread_subscription(), and resume_conflict_info_retention().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 5987 of file worker.c.

5988{
5990
5992
5994
5996
5998
5999 proc_exit(0);
6000}
static void run_apply_worker(void)
Definition worker.c:5666
bool InitializingApplyWorker
Definition worker.c:504
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5947
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202

References DatumGetInt32(), fb(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 6332 of file worker.c.

6333{
6335 {
6336 ListCell *lc;
6337
6340 {
6341 Oid subid = lfirst_oid(lc);
6342 List *workers;
6343 ListCell *lc2;
6344
6345 workers = logicalrep_workers_find(subid, true, false);
6346 foreach(lc2, workers)
6347 {
6349
6351 }
6352 }
6354 }
6355
6356 /* The List storage will be reclaimed automatically in xact cleanup. */
6358}
static List * on_commit_wakeup_workers_subids
Definition worker.c:487
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:756
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105

References fb(), lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ can_advance_nonremovable_xid()

static bool can_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4408 of file worker.c.

4409{
4410 /*
4411 * It is sufficient to manage non-removable transaction ID for a
4412 * subscription by the main apply worker to detect update_deleted reliably
4413 * even for table sync or parallel apply workers.
4414 */
4416 return false;
4417
4418 /* No need to advance if retaining dead tuples is not required */
4420 return false;
4421
4422 return true;
4423}

References am_leader_apply_worker(), MySubscription, and Subscription::retaindeadtuples.

Referenced by maybe_advance_nonremovable_xid().

◆ changes_filename()

static void changes_filename ( char path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5410 of file worker.c.

5411{
5412 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5413}
#define snprintf
Definition port.h:260

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2756 of file worker.c.

2757{
2758 /*
2759 * For partitioned tables, we only need to care if the target partition is
2760 * updatable (aka has PK or RI defined for it).
2761 */
2762 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2763 return;
2764
2765 /* Updatable, no error. */
2766 if (rel->updatable)
2767 return;
2768
2769 /*
2770 * We are in error mode so it's fine this is somewhat slow. It's better to
2771 * give user correct error.
2772 */
2774 {
2775 ereport(ERROR,
2777 errmsg("publisher did not send replica identity column "
2778 "expected by the logical replication target relation \"%s.%s\"",
2779 rel->remoterel.nspname, rel->remoterel.relname)));
2780 }
2781
2782 ereport(ERROR,
2784 errmsg("logical replication target relation \"%s.%s\" has "
2785 "neither REPLICA IDENTITY index nor PRIMARY "
2786 "KEY and published relation does not have "
2787 "REPLICA IDENTITY FULL",
2788 rel->remoterel.nspname, rel->remoterel.relname)));
2789}
#define OidIsValid(objectId)
Definition c.h:858
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905

References ereport, errcode(), errmsg, ERROR, fb(), GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 5607 of file worker.c.

5608{
5611
5616}
static ApplySubXactData subxact_data
Definition worker.c:550
void pfree(void *pointer)
Definition mcxt.c:1616
uint32 nsubxacts
Definition worker.c:544
uint32 nsubxacts_max
Definition worker.c:545
SubXactInfo * subxacts
Definition worker.c:547
TransactionId subxact_last
Definition worker.c:546

References fb(), InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 6133 of file worker.c.

6134{
6135 Relation rel;
6137 HeapTuple tup;
6139 bool started_tx = false;
6140
6142 return;
6143
6144 if (!IsTransactionState())
6145 {
6147 started_tx = true;
6148 }
6149
6150 /*
6151 * Updating pg_subscription might involve TOAST table access, so ensure we
6152 * have a valid snapshot.
6153 */
6155
6156 /*
6157 * Protect subskiplsn of pg_subscription from being concurrently updated
6158 * while clearing it.
6159 */
6162
6164
6165 /* Fetch the existing tuple. */
6168
6169 if (!HeapTupleIsValid(tup))
6170 elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6171
6173
6174 /*
6175 * Clear the subskiplsn. If the user has already changed subskiplsn before
6176 * clearing it we don't update the catalog and the replication origin
6177 * state won't get advanced. So in the worst case, if the server crashes
6178 * before sending an acknowledgment of the flush position the transaction
6179 * will be sent again and the user needs to set subskiplsn again. We can
6180 * reduce the possibility by logging a replication origin WAL record to
6181 * advance the origin LSN instead but there is no way to advance the
6182 * origin timestamp and it doesn't seem to be worth doing anything about
6183 * it since it's a very rare case.
6184 */
6185 if (subform->subskiplsn == myskiplsn)
6186 {
6187 bool nulls[Natts_pg_subscription];
6190
6191 memset(values, 0, sizeof(values));
6192 memset(nulls, false, sizeof(nulls));
6193 memset(replaces, false, sizeof(replaces));
6194
6195 /* reset subskiplsn */
6198
6200 replaces);
6201 CatalogTupleUpdate(rel, &tup->t_self, tup);
6202
6203 if (myskiplsn != finish_lsn)
6205 errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6206 errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6207 LSN_FORMAT_ARGS(finish_lsn),
6209 }
6210
6212 table_close(rel, NoLock);
6213
6215
6216 if (started_tx)
6218}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define likely(x)
Definition c.h:437
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessShareLock
Definition lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
void PopActiveSnapshot(void)
Definition snapmgr.c:775
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg, ERROR, fb(), Form_pg_subscription, GETSTRUCT(), GetTransactionSnapshot(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), table_close(), table_open(), values, WARNING, and XLogRecPtrIsValid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData * create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 877 of file worker.c.

878{
880 EState *estate;
882 List *perminfos = NIL;
883 ResultRelInfo *resultRelInfo;
884
886 edata->targetRel = rel;
887
888 edata->estate = estate = CreateExecutorState();
889
891 rte->rtekind = RTE_RELATION;
892 rte->relid = RelationGetRelid(rel->localrel);
893 rte->relkind = rel->localrel->rd_rel->relkind;
894 rte->rellockmode = AccessShareLock;
895
897
900
901 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
902
903 /*
904 * Use Relation opened by logicalrep_rel_open() instead of opening it
905 * again.
906 */
907 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
908
909 /*
910 * We put the ResultRelInfo in the es_opened_result_relations list, even
911 * though we don't populate the es_result_relations array. That's a bit
912 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
913 *
914 * ExecOpenIndices() is not called here either, each execution path doing
915 * an apply operation being responsible for that.
916 */
918 lappend(estate->es_opened_result_relations, resultRelInfo);
919
920 estate->es_output_cid = GetCurrentCommandId(true);
921
922 /* Prepare to catch AFTER triggers. */
924
925 /* other fields of edata remain NULL for now */
926
927 return edata;
928}
Bitmapset * bms_make_singleton(int x)
Definition bitmapset.c:216
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition execMain.c:1271
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition execUtils.c:799
EState * CreateExecutorState(void)
Definition execUtils.c:90
#define palloc0_object(type)
Definition fe_memutils.h:75
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
#define RelationGetRelid(relation)
Definition rel.h:516
List * es_opened_result_relations
Definition execnodes.h:724
CommandId es_output_cid
Definition execnodes.h:718
void AfterTriggerBeginQuery(void)
Definition trigger.c:5141
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), bms_make_singleton(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ExecInitRangeTable(), fb(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0_object, RelationData::rd_rel, RelationGetRelid, and RTE_RELATION.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 6007 of file worker.c.

6008{
6009 /*
6010 * Emit the error message, and recover from the error state to an idle
6011 * state
6012 */
6014
6018
6020
6021 /*
6022 * Report the worker failed during sequence synchronization, table
6023 * synchronization, or apply.
6024 */
6026
6027 /* Disable the subscription */
6029
6030 /*
6031 * Updating pg_subscription might involve TOAST table access, so ensure we
6032 * have a valid snapshot.
6033 */
6035
6039
6040 /* Ensure we remove no-longer-useful entry for worker's start time */
6043
6044 /* Notify the subscription has been disabled and exit */
6045 ereport(LOG,
6046 errmsg("subscription \"%s\" has been disabled because of an error",
6048
6049 /*
6050 * Skip the track_commit_timestamp check when disabling the worker due to
6051 * an error, as verifying commit timestamps is unnecessary in this
6052 * context.
6053 */
6057
6058 proc_exit(0);
6059}
void EmitErrorReport(void)
Definition elog.c:1882
void FlushErrorState(void)
Definition elog.c:2062
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
void AbortOutOfAnyTransaction(void)
Definition xact.c:4913

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), ApplyLauncherForgetWorkerStartTime(), CheckSubDeadTupleRetention(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg, FlushErrorState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), PopActiveSnapshot(), proc_exit(), PushActiveSnapshot(), RESUME_INTERRUPTS, Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), LogicalRepWorker::subid, and WARNING.

Referenced by start_apply(), start_sequence_sync(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
pgoff_t  offset 
)
static

Definition at line 2235 of file worker.c.

2237{
2238 char path[MAXPGPATH];
2239 BufFile *fd;
2240 int last_fileno;
2242
2244
2246
2248
2249 fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2250
2251 BufFileSeek(fd, 0, 0, SEEK_END);
2253
2255
2257
2258 if (last_fileno != fileno || last_offset != offset)
2259 elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2260 path);
2261}
int BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence)
Definition buffile.c:741
void BufFileClose(BufFile *file)
Definition buffile.c:413
static int fd(const char *x, int i)

References Assert, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog, end_replication_step(), ERROR, fb(), fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindDeletedTupleInLocalRel()

static bool FindDeletedTupleInLocalRel ( Relation  localrel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TransactionId delete_xid,
ReplOriginId delete_origin,
TimestampTz delete_time 
)
static

Definition at line 3276 of file worker.c.

3280{
3282
3283 /*
3284 * Return false if either dead tuples are not retained or commit timestamp
3285 * data is not available.
3286 */
3288 return false;
3289
3290 /*
3291 * For conflict detection, we use the leader worker's
3292 * oldest_nonremovable_xid value instead of invoking
3293 * GetOldestNonRemovableTransactionId() or using the conflict detection
3294 * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3295 * identify tuples that were recently deleted. These deleted tuples are no
3296 * longer visible to concurrent transactions. However, if a remote update
3297 * matches such a tuple, we log an update_deleted conflict.
3298 *
3299 * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3300 * transaction IDs older than oldest_nonremovable_xid, for our current
3301 * purpose, it is acceptable to treat tuples deleted by transactions prior
3302 * to oldest_nonremovable_xid as update_missing conflicts.
3303 */
3305 {
3307 }
3308 else
3309 {
3310 LogicalRepWorker *leader;
3311
3312 /*
3313 * Obtain the information from the leader apply worker as only the
3314 * leader manages oldest_nonremovable_xid (see
3315 * maybe_advance_nonremovable_xid() for details).
3316 */
3320 false);
3321 if (!leader)
3322 {
3323 ereport(ERROR,
3325 errmsg("could not detect conflict as the leader apply worker has exited")));
3326 }
3327
3328 SpinLockAcquire(&leader->relmutex);
3330 SpinLockRelease(&leader->relmutex);
3332 }
3333
3334 /*
3335 * Return false if the leader apply worker has stopped retaining
3336 * information for detecting conflicts. This implies that update_deleted
3337 * can no longer be reliably detected.
3338 */
3340 return false;
3341
3342 if (OidIsValid(localidxoid) &&
3347 delete_time);
3348 else
3352}
static bool IsIndexUsableForFindingDeletedTuple(Oid localindexoid, TransactionId conflict_detection_xmin)
Definition worker.c:3242
bool track_commit_timestamp
Definition commit_ts.c:121
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:268
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
TransactionId oldest_nonremovable_xid

References am_leader_apply_worker(), ereport, errcode(), errmsg, ERROR, fb(), InvalidOid, IsIndexUsableForFindingDeletedTuple(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, MySubscription, OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, RelationFindDeletedTupleInfoByIndex(), RelationFindDeletedTupleInfoSeq(), LogicalRepWorker::relmutex, Subscription::retaindeadtuples, SpinLockAcquire(), SpinLockRelease(), LogicalRepWorker::subid, track_commit_timestamp, TransactionIdIsValid, and WORKERTYPE_APPLY.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( ApplyExecutionData edata,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 3181 of file worker.c.

3186{
3187 EState *estate = edata->estate;
3188 bool found;
3189
3190 /*
3191 * Regardless of the top-level operation, we're performing a read here, so
3192 * check for SELECT privileges.
3193 */
3195
3196 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3197
3199 (remoterel->replident == REPLICA_IDENTITY_FULL));
3200
3202 {
3203#ifdef USE_ASSERT_CHECKING
3205
3206 /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3208 (remoterel->replident == REPLICA_IDENTITY_FULL &&
3210 edata->targetRel->attrmap)));
3212#endif
3213
3214 found = RelationFindReplTupleByIndex(localrel, localidxoid,
3217 }
3218 else
3221
3222 return found;
3223}
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
@ LockTupleExclusive
Definition lockoptions.h:59
#define ACL_SELECT
Definition parsenodes.h:77
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
Definition relation.c:835

References AccessShareLock, ACL_SELECT, Assert, EState::es_tupleTable, fb(), GetRelationIdentityOrPK(), index_close(), index_open(), IsIndexUsableForReplicaIdentityFull(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), and TargetPrivilegesCheck().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 935 of file worker.c.

936{
937 EState *estate = edata->estate;
938
939 /* Handle any queued AFTER triggers. */
940 AfterTriggerEndQuery(estate);
941
942 /* Shut down tuple routing, if any was done. */
943 if (edata->proute)
944 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
945
946 /*
947 * Cleanup. It might seem that we should call ExecCloseResultRelations()
948 * here, but we intentionally don't. It would close the rel we added to
949 * es_opened_result_relations above, which is wrong because we took no
950 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
951 * any other relations opened during execution.
952 */
953 ExecResetTupleTable(estate->es_tupleTable, false);
954 FreeExecutorState(estate);
955 pfree(edata);
956}
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
void FreeExecutorState(EState *estate)
Definition execUtils.c:197
void AfterTriggerEndQuery(EState *estate)
Definition trigger.c:5161

References AfterTriggerEndQuery(), EState::es_tupleTable, ExecCleanupTupleRouting(), ExecResetTupleTable(), fb(), FreeExecutorState(), and pfree().

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_candidate_xid()

static void get_candidate_xid ( RetainDeadTuplesData rdt_data)
static

Definition at line 4460 of file worker.c.

4461{
4464
4465 /*
4466 * Use last_recv_time when applying changes in the loop to avoid
4467 * unnecessary system time retrieval. If last_recv_time is not available,
4468 * obtain the current timestamp.
4469 */
4470 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4471
4472 /*
4473 * Compute the candidate_xid and request the publisher status at most once
4474 * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4475 * details on how this value is dynamically adjusted. This is to avoid
4476 * using CPU and network resources without making much progress.
4477 */
4478 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4479 rdt_data->xid_advance_interval))
4480 return;
4481
4482 /*
4483 * Immediately update the timer, even if the function returns later
4484 * without setting candidate_xid due to inactivity on the subscriber. This
4485 * avoids frequent calls to GetOldestActiveTransactionId.
4486 */
4487 rdt_data->candidate_xid_time = now;
4488
4489 /*
4490 * Consider transactions in the current database, as only dead tuples from
4491 * this database are required for conflict detection.
4492 */
4494
4495 /*
4496 * Oldest active transaction ID (oldest_running_xid) can't be behind any
4497 * of its previously computed value.
4498 */
4501
4502 /* Return if the oldest_nonremovable_xid cannot be advanced */
4505 {
4507 return;
4508 }
4509
4511
4512 rdt_data->candidate_xid = oldest_running_xid;
4514
4515 /* process the next phase */
4517}
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
Definition worker.c:4962
static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4430
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
int64 TimestampTz
Definition timestamp.h:39
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2845
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define TransactionIdEquals(id1, id2)
Definition transam.h:43

References adjust_xid_advance_interval(), Assert, fb(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), MyLogicalRepWorker, now(), LogicalRepWorker::oldest_nonremovable_xid, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, TimestampDifferenceExceeds(), TransactionIdEquals, and TransactionIdPrecedesOrEquals().

Referenced by process_rdt_phase_transition().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3902 of file worker.c.

3904{
3905 dlist_mutable_iter iter;
3907
3909 *flush = InvalidXLogRecPtr;
3910
3912 {
3913 FlushPosition *pos =
3914 dlist_container(FlushPosition, node, iter.cur);
3915
3916 *write = pos->remote_end;
3917
3918 if (pos->local_end <= local_flush)
3919 {
3920 *flush = pos->remote_end;
3921 dlist_delete(iter.cur);
3922 pfree(pos);
3923 }
3924 else
3925 {
3926 /*
3927 * Don't want to uselessly iterate over the rest of the list which
3928 * could potentially be long. Instead get the last element and
3929 * grab the write position from there.
3930 */
3932 &lsn_mapping);
3933 *write = pos->remote_end;
3934 *have_pending_txes = true;
3935 return;
3936 }
3937 }
3938
3940}
static dlist_head lsn_mapping
Definition worker.c:313
static void dlist_delete(dlist_node *node)
Definition ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition ilist.h:336
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
#define write(a, b, c)
Definition win32.h:14
XLogRecPtr remote_end
Definition worker.c:310
XLogRecPtr local_end
Definition worker.c:309
dlist_node * cur
Definition ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6995

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, fb(), GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback(), and wait_for_local_flush().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 6379 of file worker.c.

6380{
6381 *winfo = NULL;
6382
6384 {
6385 return TRANS_PARALLEL_APPLY;
6386 }
6387
6388 /*
6389 * If we are processing this transaction using a parallel apply worker
6390 * then either we send the changes to the parallel worker or if the worker
6391 * is busy then serialize the changes to the file which will later be
6392 * processed by the parallel worker.
6393 */
6394 *winfo = pa_find_worker(xid);
6395
6396 if (*winfo && (*winfo)->serialize_changes)
6397 {
6399 }
6400 else if (*winfo)
6401 {
6403 }
6404
6405 /*
6406 * If there is no parallel worker involved to process this transaction
6407 * then we either directly apply the change or serialize it to a file
6408 * which will later be applied when the transaction finish message is
6409 * processed.
6410 */
6411 else if (in_streamed_transaction)
6412 {
6414 }
6415 else
6416 {
6417 return TRANS_LEADER_APPLY;
6418 }
6419}
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), fb(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 784 of file worker.c.

785{
790
792
793 /* not in streaming mode */
795 return false;
796
798
799 /*
800 * The parallel apply worker needs the xid in this message to decide
801 * whether to define a savepoint, so save the original message that has
802 * not moved the cursor after the xid. We will serialize this message to a
803 * file in PARTIAL_SERIALIZE mode.
804 */
805 original_msg = *s;
806
807 /*
808 * We should have received XID of the subxact as the first part of the
809 * message, so extract it.
810 */
812
816 errmsg_internal("invalid transaction ID in streamed replication transaction")));
817
818 switch (apply_action)
819 {
822
823 /* Add the new subxact to the array (unless already there). */
825
826 /* Write the change to the current file */
827 stream_write_change(action, s);
828 return true;
829
831 Assert(winfo);
832
833 /*
834 * XXX The publisher side doesn't always send relation/type update
835 * messages after the streaming transaction, so also update the
836 * relation/type in leader apply worker. See function
837 * cleanup_rel_sync_cache.
838 */
839 if (pa_send_data(winfo, s->len, s->data))
840 return (action != LOGICAL_REP_MSG_RELATION &&
841 action != LOGICAL_REP_MSG_TYPE);
842
843 /*
844 * Switch to serialize mode when we are not able to send the
845 * change to parallel apply worker.
846 */
847 pa_switch_to_partial_serialize(winfo, false);
848
852
853 /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
854 return (action != LOGICAL_REP_MSG_RELATION &&
855 action != LOGICAL_REP_MSG_TYPE);
856
859
860 /* Define a savepoint for a subxact if needed. */
862 return false;
863
864 default:
865 elog(ERROR, "unexpected apply action: %d", (int) apply_action);
866 return false; /* silence compiler warning */
867 }
868}
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition worker.c:5325
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414

References Assert, StringInfoData::data, elog, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg_internal(), ERROR, fb(), get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pg_fallthrough, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 5779 of file worker.c.

5780{
5781 /* Run as replica session replication role. */
5782 SetConfigOption("session_replication_role", "replica",
5784
5785 /* Connect to our database. */
5788 0);
5789
5790 /*
5791 * Set always-secure search path, so malicious users can't redirect user
5792 * code (e.g. pg_index.indexprs).
5793 */
5794 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5795
5797 "ApplyContext",
5799
5801
5802 /*
5803 * Lock the subscription to prevent it from being concurrently dropped,
5804 * then re-verify its existence. After the initialization, the worker will
5805 * be terminated gracefully if the subscription is dropped.
5806 */
5809
5811
5812 if (MySubscription)
5813 {
5815 }
5816 else
5817 {
5818 ereport(LOG,
5819 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5821
5822 /* Ensure we remove no-longer-useful entry for worker's start time */
5825
5826 proc_exit(0);
5827 }
5828
5829 MySubscriptionValid = true;
5830
5831 if (!MySubscription->enabled)
5832 {
5833 ereport(LOG,
5834 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5835 MySubscription->name)));
5836
5838 }
5839
5840 /*
5841 * Restart the worker if retain_dead_tuples was enabled during startup.
5842 *
5843 * At this point, the replication slot used for conflict detection might
5844 * not exist yet, or could be dropped soon if the launcher perceives
5845 * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5846 * oldest_nonremovable_xid when the slot is absent or at risk of being
5847 * dropped, a restart is initiated.
5848 *
5849 * The oldest_nonremovable_xid should be initialized only when the
5850 * subscription's retention is active before launching the worker. See
5851 * logicalrep_worker_launch.
5852 */
5853 if (am_leader_apply_worker() &&
5857 {
5858 ereport(LOG,
5859 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5860 MySubscription->name, "retain_dead_tuples"));
5861
5863 }
5864
5865 /* Setup synchronous commit according to the user's wishes */
5866 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5868
5869 /* Change wal_receiver_timeout according to the user's wishes */
5871
5872 /*
5873 * Keep us informed about subscription or role changes. Note that the
5874 * role's superuser privilege can be revoked.
5875 */
5878 (Datum) 0);
5879 /* Changes to foreign servers may affect subscriptions using SERVER. */
5882 (Datum) 0);
5883 /* Changes to user mappings may affect subscriptions using SERVER. */
5886 (Datum) 0);
5887
5888 /*
5889 * Changes to FDW connection_function may affect subscriptions using
5890 * SERVER.
5891 */
5894 (Datum) 0);
5895
5898 (Datum) 0);
5899
5900 if (am_tablesync_worker())
5901 ereport(LOG,
5902 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5905 else if (am_sequencesync_worker())
5906 ereport(LOG,
5907 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5909 else
5910 ereport(LOG,
5911 errmsg("logical replication apply worker for subscription \"%s\" has started",
5913
5915
5916 /*
5917 * Register a callback to reset the origin state before aborting any
5918 * pending transaction during shutdown (see ShutdownPostgres()). This will
5919 * avoid origin advancement for an incomplete transaction which could
5920 * otherwise lead to its loss as such a transaction won't be sent by the
5921 * server again.
5922 *
5923 * Note that even a LOG or DEBUG statement placed after setting the origin
5924 * state may process a shutdown signal before committing the current apply
5925 * operation. So, it is important to register such a callback here.
5926 *
5927 * Register this callback here to ensure that all types of logical
5928 * replication workers that set up origins and apply remote transactions
5929 * are protected.
5930 */
5932}
static void apply_worker_exit(void)
Definition worker.c:5011
static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition worker.c:5210
MemoryContext ApplyContext
Definition worker.c:477
static bool MySubscriptionValid
Definition worker.c:485
static void set_wal_receiver_timeout(void)
Definition worker.c:5175
static void on_exit_clear_xact_state(int code, Datum arg)
Definition worker.c:5938
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:909
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition guc.c:4234
@ PGC_S_OVERRIDE
Definition guc.h:123
@ PGC_SUSET
Definition guc.h:78
@ PGC_BACKEND
Definition guc.h:77
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
MemoryContext cxt
static bool am_sequencesync_worker(void)

References AccessShareLock, ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), before_shmem_exit(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::cxt, LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg, fb(), get_rel_name(), GetSubscription(), LockSharedObject(), LOG, MemoryContextSetParent(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, on_exit_clear_xact_state(), PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, Subscription::retaindeadtuples, Subscription::retentionactive, set_wal_receiver_timeout(), SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), Subscription::synccommit, TopMemoryContext, TransactionIdIsValid, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ IsIndexUsableForFindingDeletedTuple()

static bool IsIndexUsableForFindingDeletedTuple ( Oid  localindexoid,
TransactionId  conflict_detection_xmin 
)
static

Definition at line 3242 of file worker.c.

3244{
3247
3249
3250 if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3251 elog(ERROR, "cache lookup failed for index %u", localindexoid);
3252
3253 /*
3254 * No need to check for a frozen transaction ID, as
3255 * TransactionIdPrecedes() manages it internally, treating it as falling
3256 * behind the conflict_detection_xmin.
3257 */
3259
3261
3263}
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References elog, ERROR, fb(), HeapTupleHeaderGetXmin(), HeapTupleIsValid, ObjectIdGetDatum(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdPrecedes().

Referenced by FindDeletedTupleInLocalRel().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 6074 of file worker.c.

6075{
6077}
bool IsLogicalWorker(void)
Definition worker.c:6065

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 6065 of file worker.c.

6066{
6067 return MyLogicalRepWorker != NULL;
6068}

References fb(), and MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3988 of file worker.c.

3989{
3991 bool ping_sent = false;
3992 TimeLineID tli;
3993 ErrorContextCallback errcallback;
3995
3996 /*
3997 * Init the ApplyMessageContext which we clean up after each replication
3998 * protocol message.
3999 */
4001 "ApplyMessageContext",
4003
4004 /*
4005 * This memory context is used for per-stream data when the streaming mode
4006 * is enabled. This context is reset on each stream stop.
4007 */
4009 "LogicalStreamingContext",
4011
4012 /* mark as idle, before starting to loop */
4014
4015 /*
4016 * Push apply error context callback. Fields will be filled while applying
4017 * a change.
4018 */
4019 errcallback.callback = apply_error_callback;
4020 errcallback.previous = error_context_stack;
4021 error_context_stack = &errcallback;
4023
4024 /* This outer loop iterates once per wait. */
4025 for (;;)
4026 {
4028 int rc;
4029 int len;
4030 char *buf = NULL;
4031 bool endofstream = false;
4032 long wait_time;
4033
4035
4037
4039
4040 if (len != 0)
4041 {
4042 /* Loop to process all available data (without blocking). */
4043 for (;;)
4044 {
4046
4047 if (len == 0)
4048 {
4049 break;
4050 }
4051 else if (len < 0)
4052 {
4053 ereport(LOG,
4054 (errmsg("data stream from publisher has ended")));
4055 endofstream = true;
4056 break;
4057 }
4058 else
4059 {
4060 int c;
4062
4064 {
4065 ConfigReloadPending = false;
4067 }
4068
4069 /* Reset timeout. */
4071 ping_sent = false;
4072
4073 rdt_data.last_recv_time = last_recv_timestamp;
4074
4075 /* Ensure we are reading the data into our memory context. */
4077
4079
4080 c = pq_getmsgbyte(&s);
4081
4082 if (c == PqReplMsg_WALData)
4083 {
4084 XLogRecPtr start_lsn;
4085 XLogRecPtr end_lsn;
4087
4088 start_lsn = pq_getmsgint64(&s);
4089 end_lsn = pq_getmsgint64(&s);
4091
4092 if (last_received < start_lsn)
4093 last_received = start_lsn;
4094
4095 if (last_received < end_lsn)
4096 last_received = end_lsn;
4097
4099
4100 apply_dispatch(&s);
4101
4103 }
4104 else if (c == PqReplMsg_Keepalive)
4105 {
4106 XLogRecPtr end_lsn;
4108 bool reply_requested;
4109
4110 end_lsn = pq_getmsgint64(&s);
4113
4114 if (last_received < end_lsn)
4115 last_received = end_lsn;
4116
4118
4120
4122 }
4123 else if (c == PqReplMsg_PrimaryStatusUpdate)
4124 {
4125 rdt_data.remote_lsn = pq_getmsgint64(&s);
4126 rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4128 rdt_data.reply_time = pq_getmsgint64(&s);
4129
4130 /*
4131 * This should never happen, see
4132 * ProcessStandbyPSRequestMessage. But if it happens
4133 * due to a bug, we don't want to proceed as it can
4134 * incorrectly advance oldest_nonremovable_xid.
4135 */
4136 if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4137 elog(ERROR, "cannot get the latest WAL position from the publisher");
4138
4140
4141 UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4142 }
4143 /* other message types are purposefully ignored */
4144
4146 }
4147
4149 }
4150 }
4151
4152 /* confirm all writes so far */
4153 send_feedback(last_received, false, false);
4154
4155 /* Reset the timestamp if no message was received */
4156 rdt_data.last_recv_time = 0;
4157
4159
4161 {
4162 /*
4163 * If we didn't get any transactions for a while there might be
4164 * unconsumed invalidation messages in the queue, consume them
4165 * now.
4166 */
4169
4170 /*
4171 * Process any relations that are being synchronized in parallel
4172 * and any newly added tables or sequences.
4173 */
4175 }
4176
4177 /* Cleanup the memory. */
4180
4181 /* Check if we need to exit the streaming loop. */
4182 if (endofstream)
4183 break;
4184
4185 /*
4186 * Wait for more data or latch. If we have unflushed transactions,
4187 * wake up after WalWriterDelay to see if they've been flushed yet (in
4188 * which case we should send a feedback message). Otherwise, there's
4189 * no particular urgency about waking up unless we get data or a
4190 * signal.
4191 */
4193 wait_time = WalWriterDelay;
4194 else
4195 wait_time = NAPTIME_PER_CYCLE;
4196
4197 /*
4198 * Ensure to wake up when it's possible to advance the non-removable
4199 * transaction ID, or when the retention duration may have exceeded
4200 * max_retention_duration.
4201 */
4203 {
4204 if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4205 rdt_data.xid_advance_interval)
4206 wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4207 else if (MySubscription->maxretention > 0)
4208 wait_time = Min(wait_time, MySubscription->maxretention);
4209 }
4210
4214 fd, wait_time,
4216
4217 if (rc & WL_LATCH_SET)
4218 {
4221 }
4222
4224 {
4225 ConfigReloadPending = false;
4227 }
4228
4229 if (rc & WL_TIMEOUT)
4230 {
4231 /*
4232 * We didn't receive anything new. If we haven't heard anything
4233 * from the server for more than wal_receiver_timeout / 2, ping
4234 * the server. Also, if it's been longer than
4235 * wal_receiver_status_interval since the last update we sent,
4236 * send a status update to the primary anyway, to report any
4237 * progress in applying WAL.
4238 */
4239 bool requestReply = false;
4240
4241 /*
4242 * Check if time since last receive from primary has reached the
4243 * configured limit.
4244 */
4245 if (wal_receiver_timeout > 0)
4246 {
4249
4250 timeout =
4253
4254 if (now >= timeout)
4255 ereport(ERROR,
4257 errmsg("terminating logical replication worker due to timeout")));
4258
4259 /* Check to see if it's time for a ping. */
4260 if (!ping_sent)
4261 {
4263 (wal_receiver_timeout / 2));
4264 if (now >= timeout)
4265 {
4266 requestReply = true;
4267 ping_sent = true;
4268 }
4269 }
4270 }
4271
4273
4275
4276 /*
4277 * Force reporting to ensure long idle periods don't lead to
4278 * arbitrarily delayed stats. Stats can only be reported outside
4279 * of (implicit or explicit) transactions. That shouldn't lead to
4280 * stats being delayed for long, because transactions are either
4281 * sent as a whole on commit or streamed. Streamed transactions
4282 * are spilled to disk and applied on commit.
4283 */
4284 if (!IsTransactionState())
4285 pgstat_report_stat(true);
4286 }
4287 }
4288
4289 /* Pop the error context stack */
4290 error_context_stack = errcallback.previous;
4292
4293 /* All done */
4295}
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition worker.c:3972
#define NAPTIME_PER_CYCLE
Definition worker.c:304
ErrorContextCallback * apply_error_context_stack
Definition worker.c:474
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition worker.c:4304
static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4394
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:482
void apply_error_callback(void *arg)
Definition worker.c:6222
static MemoryContext LogicalStreamingContext
Definition worker.c:480
uint64_t uint64
Definition c.h:625
ErrorContextCallback * error_context_stack
Definition elog.c:99
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
static char buf[DEFAULT_XLOG_SEG_SIZE]
int64 timestamp
int pgsocket
Definition port.h:29
#define PGINVALID_SOCKET
Definition port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
char * c
#define PqReplMsg_WALData
Definition protocol.h:77
#define PqReplMsg_Keepalive
Definition protocol.h:75
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
static FullTransactionId FullTransactionIdFromU64(uint64 value)
Definition transam.h:81
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
int wal_receiver_timeout
Definition walreceiver.c:91
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd)
int WalWriterDelay
Definition walwriter.c:71
uint32 TimeLineID
Definition xlogdefs.h:63

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, dlist_is_empty(), elog, ereport, errcode(), errmsg, ERROR, error_context_stack, fb(), fd(), FullTransactionIdFromU64(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, initReadOnlyStringInfo(), IsTransactionState(), len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, Subscription::maxretention, maybe_advance_nonremovable_xid(), maybe_reread_subscription(), MemoryContextReset(), MemoryContextSwitchTo(), Min, MyLatch, MySubscription, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), PqReplMsg_Keepalive, PqReplMsg_PrimaryStatusUpdate, PqReplMsg_WALData, ErrorContextCallback::previous, ProcessConfigFile(), ProcessSyncingRelations(), RDT_GET_CANDIDATE_XID, ResetLatch(), Subscription::retentionactive, send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, WL_TIMEOUT, and XLogRecPtrIsValid.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

◆ maybe_advance_nonremovable_xid()

static void maybe_advance_nonremovable_xid ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4394 of file worker.c.

4396{
4398 return;
4399
4401}
static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4408

References can_advance_nonremovable_xid(), fb(), and process_rdt_phase_transition().

Referenced by LogicalRepApplyLoop().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 5045 of file worker.c.

5046{
5048 bool started_tx = false;
5049
5050 /* When cache state is valid there is nothing to do here. */
5052 return;
5053
5054 /* This function might be called inside or outside of transaction. */
5055 if (!IsTransactionState())
5056 {
5058 started_tx = true;
5059 }
5060
5062
5063 if (newsub)
5064 {
5066 }
5067 else
5068 {
5069 /*
5070 * Exit if the subscription was removed. This normally should not
5071 * happen as the worker gets killed during DROP SUBSCRIPTION.
5072 */
5073 ereport(LOG,
5074 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5075 MySubscription->name)));
5076
5077 /* Ensure we remove no-longer-useful entry for worker's start time */
5080
5081 proc_exit(0);
5082 }
5083
5084 /* Exit if the subscription was disabled. */
5085 if (!newsub->enabled)
5086 {
5087 ereport(LOG,
5088 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5089 MySubscription->name)));
5090
5092 }
5093
5094 /* !slotname should never happen when enabled is true. */
5095 Assert(newsub->slotname);
5096
5097 /* two-phase cannot be altered while the worker is running */
5098 Assert(newsub->twophasestate == MySubscription->twophasestate);
5099
5100 /*
5101 * Exit if any parameter that affects the remote connection was changed.
5102 * The launcher will start a new worker but note that the parallel apply
5103 * worker won't restart if the streaming option's value is changed from
5104 * 'parallel' to any other value or the server decides not to stream the
5105 * in-progress transaction.
5106 */
5107 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5108 strcmp(newsub->name, MySubscription->name) != 0 ||
5109 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5110 newsub->binary != MySubscription->binary ||
5111 newsub->stream != MySubscription->stream ||
5112 newsub->passwordrequired != MySubscription->passwordrequired ||
5113 strcmp(newsub->origin, MySubscription->origin) != 0 ||
5114 newsub->owner != MySubscription->owner ||
5115 !equal(newsub->publications, MySubscription->publications))
5116 {
5118 ereport(LOG,
5119 (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5120 MySubscription->name)));
5121 else
5122 ereport(LOG,
5123 (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5124 MySubscription->name)));
5125
5127 }
5128
5129 /*
5130 * Exit if the subscription owner's superuser privileges have been
5131 * revoked.
5132 */
5133 if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5134 {
5136 ereport(LOG,
5137 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5139 else
5140 ereport(LOG,
5141 errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5143
5145 }
5146
5147 /* Check for other changes that should never happen too. */
5148 if (newsub->dbid != MySubscription->dbid)
5149 {
5150 elog(ERROR, "subscription %u changed unexpectedly",
5152 }
5153
5154 /* Clean old subscription info and switch to new one. */
5157
5158 /* Change synchronous commit according to the user's wishes */
5159 SetConfigOption("synchronous_commit", MySubscription->synccommit,
5161
5162 /* Change wal_receiver_timeout according to the user's wishes */
5164
5165 if (started_tx)
5167
5168 MySubscriptionValid = true;
5169}
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
static color newsub(struct colormap *cm, color co)
Definition regc_color.c:389

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert, Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::cxt, Subscription::dbid, elog, equal(), ereport, errmsg, ERROR, fb(), GetSubscription(), IsTransactionState(), LOG, MemoryContextDelete(), MemoryContextSetParent(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, set_wal_receiver_timeout(), SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 6084 of file worker.c.

6085{
6089
6090 /*
6091 * Quick return if it's not requested to skip this transaction. This
6092 * function is called for every remote transaction and we assume that
6093 * skipping the transaction is not used often.
6094 */
6096 MySubscription->skiplsn != finish_lsn))
6097 return;
6098
6099 /* Start skipping all changes of this transaction */
6100 skip_xact_finish_lsn = finish_lsn;
6101
6102 ereport(LOG,
6103 errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6105}
static XLogRecPtr skip_xact_finish_lsn
Definition worker.c:521

References Assert, ereport, errmsg, in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsValid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ on_exit_clear_xact_state()

static void on_exit_clear_xact_state ( int  code,
Datum  arg 
)
static

Definition at line 5938 of file worker.c.

5939{
5941}
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377

References replorigin_xact_clear().

Referenced by InitializeLogRepWorker().

◆ process_rdt_phase_transition()

static void process_rdt_phase_transition ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4430 of file worker.c.

4432{
4433 switch (rdt_data->phase)
4434 {
4437 break;
4440 break;
4443 break;
4446 break;
4449 break;
4452 break;
4453 }
4454}
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data)
Definition worker.c:4621
static void get_candidate_xid(RetainDeadTuplesData *rdt_data)
Definition worker.c:4460
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received)
Definition worker.c:4562
static void request_publisher_status(RetainDeadTuplesData *rdt_data)
Definition worker.c:4523
static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4849
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4811

References fb(), get_candidate_xid(), RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, request_publisher_status(), resume_conflict_info_retention(), stop_conflict_info_retention(), wait_for_local_flush(), and wait_for_publisher_status().

Referenced by get_candidate_xid(), maybe_advance_nonremovable_xid(), wait_for_local_flush(), and wait_for_publisher_status().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char originname,
Size  szoriginname 
)

Definition at line 648 of file worker.c.

650{
651 if (OidIsValid(relid))
652 {
653 /* Replication origin name for tablesync workers. */
654 snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
655 }
656 else
657 {
658 /* Replication origin name for non-tablesync workers. */
660 }
661}

References fb(), OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), binary_upgrade_replorigin_advance(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), run_apply_worker(), and run_tablesync_worker().

◆ request_publisher_status()

static void request_publisher_status ( RetainDeadTuplesData rdt_data)
static

Definition at line 4523 of file worker.c.

4524{
4526
4527 if (!request_message)
4528 {
4530
4533 }
4534 else
4536
4537 /*
4538 * Send the current time to update the remote walsender's latest reply
4539 * message received time.
4540 */
4543
4544 elog(DEBUG2, "sending publisher status request message");
4545
4546 /* Send a request for the publisher status */
4548 request_message->data, request_message->len);
4549
4551
4552 /*
4553 * Skip calling maybe_advance_nonremovable_xid() since further transition
4554 * is possible only once we receive the publisher status message.
4555 */
4556}
#define DEBUG2
Definition elog.h:30
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
#define walrcv_send(conn, buffer, nbytes)

References ApplyContext, DEBUG2, elog, fb(), GetCurrentTimestamp(), LogRepWorkerWalRcvConn, makeStringInfo(), MemoryContextSwitchTo(), pq_sendbyte(), pq_sendint64(), PqReplMsg_PrimaryStatusRequest, RDT_WAIT_FOR_PUBLISHER_STATUS, resetStringInfo(), and walrcv_send.

Referenced by process_rdt_phase_transition().

◆ reset_apply_error_context_info()

◆ reset_retention_data_fields()

static void reset_retention_data_fields ( RetainDeadTuplesData rdt_data)
static

Definition at line 4928 of file worker.c.

4929{
4931 rdt_data->remote_lsn = InvalidXLogRecPtr;
4932 rdt_data->remote_oldestxid = InvalidFullTransactionId;
4933 rdt_data->remote_nextxid = InvalidFullTransactionId;
4934 rdt_data->reply_time = 0;
4935 rdt_data->remote_wait_for = InvalidFullTransactionId;
4936 rdt_data->candidate_xid = InvalidTransactionId;
4937 rdt_data->table_sync_wait_time = 0;
4938}
#define InvalidFullTransactionId
Definition transam.h:56

References fb(), InvalidFullTransactionId, InvalidTransactionId, InvalidXLogRecPtr, and RDT_GET_CANDIDATE_XID.

Referenced by stop_conflict_info_retention(), and wait_for_local_flush().

◆ resume_conflict_info_retention()

static void resume_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4849 of file worker.c.

4850{
4851 /* We can't resume retention without updating retention status. */
4852 if (!update_retention_status(true))
4853 return;
4854
4855 ereport(LOG,
4856 errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4859 ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4860 : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4861
4862 /*
4863 * Restart the worker to let the launcher initialize
4864 * oldest_nonremovable_xid at startup.
4865 *
4866 * While it's technically possible to derive this value on-the-fly using
4867 * the conflict detection slot's xmin, doing so risks a race condition:
4868 * the launcher might clean slot.xmin just after retention resumes. This
4869 * would make oldest_nonremovable_xid unreliable, especially during xid
4870 * wraparound.
4871 *
4872 * Although this can be prevented by introducing heavy weight locking, the
4873 * complexity it will bring doesn't seem worthwhile given how rarely
4874 * retention is resumed.
4875 */
4877}
static bool update_retention_status(bool active)
Definition worker.c:4889

References apply_worker_exit(), ereport, errdetail(), errmsg, LOG, Subscription::maxretention, MySubscription, Subscription::name, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ run_apply_worker()

static void run_apply_worker ( void  )
static

Definition at line 5666 of file worker.c.

5667{
5668 char originname[NAMEDATALEN];
5670 char *slotname = NULL;
5673 TimeLineID startpointTLI;
5674 char *err;
5675 bool must_use_password;
5676
5677 slotname = MySubscription->slotname;
5678
5679 /*
5680 * This shouldn't happen if the subscription is enabled, but guard against
5681 * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5682 * slot is NULL.)
5683 */
5684 if (!slotname)
5685 ereport(ERROR,
5687 errmsg("subscription has no replication slot set")));
5688
5689 /* Setup replication origin tracking. */
5691 originname, sizeof(originname));
5694 if (!OidIsValid(originid))
5700
5701 /* Is the use of a password mandatory? */
5704
5706 true, must_use_password,
5708
5710 ereport(ERROR,
5712 errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5713 MySubscription->name, err)));
5714
5715 /*
5716 * We don't really use the output identify_system for anything but it does
5717 * some initializations on the upstream so let's still call it.
5718 */
5720
5722
5724
5725 /*
5726 * Even when the two_phase mode is requested by the user, it remains as
5727 * the tri-state PENDING until all tablesyncs have reached READY state.
5728 * Only then, can it become ENABLED.
5729 *
5730 * Note: If the subscription has no tables then leave the state as
5731 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5732 * work.
5733 */
5736 {
5737 /* Start streaming with two_phase enabled */
5738 options.proto.logical.twophase = true;
5740
5742
5743 /*
5744 * Updating pg_subscription might involve TOAST table access, so
5745 * ensure we have a valid snapshot.
5746 */
5748
5753 }
5754 else
5755 {
5757 }
5758
5760 (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5763 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5764 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5765 "?")));
5766
5767 /* Run the main loop. */
5769}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5557
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5626
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
void set_apply_error_context_origin(char *originname)
Definition worker.c:6364
void err(int eval, const char *fmt,...)
Definition err.c:43
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1353
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156
#define NAMEDATALEN
bool AllTablesyncsReady(void)
Definition tablesync.c:1629
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1680
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_identify_system(conn, primary_tli)
uint16 ReplOriginId
Definition xlogdefs.h:69

References AllTablesyncsReady(), CommitTransactionCommand(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetTransactionSnapshot(), InvalidOid, InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, ReplOriginXactState::origin, Subscription::ownersuperuser, Subscription::passwordrequired, PopActiveSnapshot(), PushActiveSnapshot(), ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_setup(), replorigin_xact_state, set_apply_error_context_origin(), set_stream_options(), Subscription::slotname, start_apply(), StartTransactionCommand(), Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, and walrcv_startstreaming.

Referenced by ApplyWorkerMain().

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 4304 of file worker.c.

4305{
4306 static StringInfo reply_message = NULL;
4307 static TimestampTz send_time = 0;
4308
4311
4315 bool have_pending_txes;
4316
4317 /*
4318 * If the user doesn't want status to be reported to the publisher, be
4319 * sure to exit before doing anything at all.
4320 */
4321 if (!force && wal_receiver_status_interval <= 0)
4322 return;
4323
4324 /* It's legal to not pass a recvpos */
4325 if (recvpos < last_recvpos)
4327
4329
4330 /*
4331 * No outstanding transactions to flush, we can report the latest received
4332 * position. This is important for synchronous replication.
4333 */
4334 if (!have_pending_txes)
4336
4337 if (writepos < last_writepos)
4339
4340 if (flushpos < last_flushpos)
4342
4344
4345 /* if we've already reported everything we're good */
4346 if (!force &&
4351 return;
4352 send_time = now;
4353
4354 if (!reply_message)
4355 {
4357
4360 }
4361 else
4363
4365 pq_sendint64(reply_message, recvpos); /* write */
4366 pq_sendint64(reply_message, flushpos); /* flush */
4367 pq_sendint64(reply_message, writepos); /* apply */
4368 pq_sendint64(reply_message, now); /* sendTime */
4369 pq_sendbyte(reply_message, requestReply); /* replyRequested */
4370
4371 elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4372 force,
4376
4379
4380 if (recvpos > last_recvpos)
4382 if (writepos > last_writepos)
4384 if (flushpos > last_flushpos)
4386}
static XLogRecPtr last_flushpos
Definition worker.c:532
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition worker.c:3902
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static StringInfoData reply_message

References ApplyContext, StringInfoData::data, DEBUG2, elog, fb(), get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, last_flushpos, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), PqReplMsg_StandbyStatusUpdate, reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char originname)

Definition at line 6364 of file worker.c.

6365{
6367 originname);
6368}
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768

References apply_error_callback_arg, ApplyContext, fb(), MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_xact()

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char slotname,
XLogRecPtr origin_startpos 
)

Definition at line 5557 of file worker.c.

5560{
5561 int server_version;
5562
5563 options->logical = true;
5564 options->startpoint = *origin_startpos;
5565 options->slotname = slotname;
5566
5568 options->proto.logical.proto_version =
5573
5574 options->proto.logical.publication_names = MySubscription->publications;
5575 options->proto.logical.binary = MySubscription->binary;
5576
5577 /*
5578 * Assign the appropriate option value for streaming option according to
5579 * the 'streaming' mode and the publisher's ability to support that mode.
5580 */
5581 if (server_version >= 160000 &&
5583 {
5584 options->proto.logical.streaming_str = "parallel";
5586 }
5587 else if (server_version >= 140000 &&
5589 {
5590 options->proto.logical.streaming_str = "on";
5592 }
5593 else
5594 {
5595 options->proto.logical.streaming_str = NULL;
5597 }
5598
5599 options->proto.logical.twophase = false;
5600 options->proto.logical.origin = pstrdup(MySubscription->origin);
5601}
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_VERSION_NUM
char * pstrdup(const char *in)
Definition mcxt.c:1781
static int server_version
Definition pg_dumpall.c:122
#define walrcv_server_version(conn)

References Subscription::binary, fb(), LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ set_wal_receiver_timeout()

static void set_wal_receiver_timeout ( void  )
static

Definition at line 5175 of file worker.c.

5176{
5177 bool parsed;
5178 int val;
5180
5181 /*
5182 * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5183 * which comes from the subscription's wal_receiver_timeout option. If the
5184 * value is -1, reset the GUC to its default, meaning it will inherit from
5185 * the server config, command line, or role/database settings.
5186 */
5188 if (parsed && val == -1)
5189 SetConfigOption("wal_receiver_timeout", NULL,
5191 else
5192 SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5194
5195 /*
5196 * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5197 * message when it changes, to verify it was set correctly.
5198 */
5200 elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5202}
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
@ PGC_S_SESSION
Definition guc.h:126
long val
Definition informix.c:689

References DEBUG1, elog, fb(), MySubscription, Subscription::name, parse_int(), PGC_BACKEND, PGC_S_SESSION, SetConfigOption(), val, wal_receiver_timeout, and Subscription::walrcvtimeout.

Referenced by InitializeLogRepWorker(), and maybe_reread_subscription().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 5947 of file worker.c.

5948{
5949 /* Attach to slot */
5951
5953
5954 /* Setup signal handling */
5957
5958 /*
5959 * We don't currently need any ResourceOwner in a walreceiver process, but
5960 * if we did, we could call CreateAuxProcessResourceOwner here.
5961 */
5962
5963 /* Initialise stats to a sanish value */
5966
5967 /* Load the libpq-specific functions */
5968 load_file("libpqwalreceiver", false);
5969
5971
5972 /* Connect to the origin and start the replication. */
5973 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5975
5976 /*
5977 * Setup callback for syscache so that we know when something changes in
5978 * the subscription relation state.
5979 */
5982 (Datum) 0);
5983}
void InitializeLogRepWorker(void)
Definition worker.c:5779
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:949
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition launcher.c:767
#define pqsignal
Definition port.h:547
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define SIGHUP
Definition win32_port.h:158

References am_leader_apply_worker(), am_sequencesync_worker(), am_tablesync_worker(), Assert, BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, elog, fb(), GetCurrentTimestamp(), InitializeLogRepWorker(), InvalidateSyncingRelStates(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal, LogicalRepWorker::reply_time, SIGHUP, and SignalHandlerForConfigReload().

Referenced by ApplyWorkerMain(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 688 of file worker.c.

689{
690 switch (MyLogicalRepWorker->type)
691 {
693 return MyLogicalRepWorker->relid == rel->localreloid;
694
696 /* We don't synchronize rel's that are in unknown state. */
697 if (rel->state != SUBREL_STATE_READY &&
701 errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
703 errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
704
705 return rel->state == SUBREL_STATE_READY;
706
707 case WORKERTYPE_APPLY:
708 return (rel->state == SUBREL_STATE_READY ||
709 (rel->state == SUBREL_STATE_SYNCDONE &&
710 rel->statelsn <= remote_final_lsn));
711
713 /* Should never happen. */
714 elog(ERROR, "sequence synchronization worker is not expected to apply changes");
715 break;
716
718 /* Should never happen. */
719 elog(ERROR, "Unknown worker type");
720 }
721
722 return false; /* dummy for compiler */
723}
LogicalRepWorkerType type
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY

References elog, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, LogicalRepRelMapEntry::statelsn, LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ should_stop_conflict_info_retention()

static bool should_stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4776 of file worker.c.

4777{
4779
4780 Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4783
4785 return false;
4786
4787 /*
4788 * Use last_recv_time when applying changes in the loop to avoid
4789 * unnecessary system time retrieval. If last_recv_time is not available,
4790 * obtain the current timestamp.
4791 */
4792 now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4793
4794 /*
4795 * Return early if the wait time has not exceeded the configured maximum
4796 * (max_retention_duration). Time spent waiting for table synchronization
4797 * is excluded from this calculation, as it occurs infrequently.
4798 */
4799 if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4801 rdt_data->table_sync_wait_time))
4802 return false;
4803
4804 return true;
4805}

References Assert, fb(), GetCurrentTimestamp(), Subscription::maxretention, MySubscription, now(), RDT_WAIT_FOR_LOCAL_FLUSH, RDT_WAIT_FOR_PUBLISHER_STATUS, TimestampDifferenceExceeds(), and TransactionIdIsValid.

Referenced by wait_for_local_flush(), and wait_for_publisher_status().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 966 of file worker.c.

968{
970 int num_phys_attrs = desc->natts;
971 int i;
972 int attnum,
973 num_defaults = 0;
974 int *defmap;
975 ExprState **defexprs;
976 ExprContext *econtext;
977
978 econtext = GetPerTupleExprContext(estate);
979
980 /* We got all the data via replication, no need to evaluate anything. */
981 if (num_phys_attrs == rel->remoterel.natts)
982 return;
983
984 defmap = palloc_array(int, num_phys_attrs);
986
988 for (attnum = 0; attnum < num_phys_attrs; attnum++)
989 {
991 Expr *defexpr;
992
993 if (cattr->attisdropped || cattr->attgenerated)
994 continue;
995
996 if (rel->attrmap->attnums[attnum] >= 0)
997 continue;
998
999 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
1000
1001 if (defexpr != NULL)
1002 {
1003 /* Run the expression through planner */
1004 defexpr = expression_planner(defexpr);
1005
1006 /* Initialize executable expression in copycontext */
1007 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1008 defmap[num_defaults] = attnum;
1009 num_defaults++;
1010 }
1011 }
1012
1013 for (i = 0; i < num_defaults; i++)
1014 slot->tts_values[defmap[i]] =
1015 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1016}
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition execExpr.c:143
#define GetPerTupleExprContext(estate)
Definition executor.h:667
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition executor.h:403
#define palloc_array(type, count)
Definition fe_memutils.h:76
int16 attnum
Expr * expression_planner(Expr *expr)
Definition planner.c:6839
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition attmap.h:37
bool * tts_isnull
Definition tuptable.h:133
Datum * tts_values
Definition tuptable.h:131

References Assert, attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), fb(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc_array, RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescCompactAttr().

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1125 of file worker.c.

1128{
1129 int natts = slot->tts_tupleDescriptor->natts;
1130 int i;
1131
1132 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1133 ExecClearTuple(slot);
1134
1135 /*
1136 * Copy all the column data from srcslot, so that we'll have valid values
1137 * for unreplaced columns.
1138 */
1139 Assert(natts == srcslot->tts_tupleDescriptor->natts);
1141 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1142 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1143
1144 /* Call the "in" function for each replaced attribute */
1145 Assert(natts == rel->attrmap->maplen);
1146 for (i = 0; i < natts; i++)
1147 {
1149 int remoteattnum = rel->attrmap->attnums[i];
1150
1151 if (remoteattnum < 0)
1152 continue;
1153
1155
1157 {
1159
1160 /* Set attnum for error callback */
1162
1164 {
1165 Oid typinput;
1166 Oid typioparam;
1167
1168 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1169 slot->tts_values[i] =
1171 typioparam, att->atttypmod);
1172 slot->tts_isnull[i] = false;
1173 }
1174 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1175 {
1176 Oid typreceive;
1177 Oid typioparam;
1178
1179 /*
1180 * In some code paths we may be asked to re-parse the same
1181 * tuple data. Reset the StringInfo's cursor so that works.
1182 */
1183 colvalue->cursor = 0;
1184
1185 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1186 slot->tts_values[i] =
1187 OidReceiveFunctionCall(typreceive, colvalue,
1188 typioparam, att->atttypmod);
1189
1190 /* Trouble if it didn't eat the whole buffer */
1191 if (colvalue->cursor != colvalue->len)
1192 ereport(ERROR,
1194 errmsg("incorrect binary data format in logical replication column %d",
1195 remoteattnum + 1)));
1196 slot->tts_isnull[i] = false;
1197 }
1198 else
1199 {
1200 /* must be LOGICALREP_COLUMN_NULL */
1201 slot->tts_values[i] = (Datum) 0;
1202 slot->tts_isnull[i] = true;
1203 }
1204
1205 /* Reset attnum for error callback */
1207 }
1208 }
1209
1210 /* And finally, declare that "slot" contains a valid virtual tuple */
1212}
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition fmgr.c:1773
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition fmgr.c:1755
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition lsyscache.c:3096
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition lsyscache.c:3162
FormData_pg_attribute * Form_pg_attribute
TupleDesc tts_tupleDescriptor
Definition tuptable.h:129
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ereport, errcode(), errmsg, ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), fb(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, memcpy(), TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 1024 of file worker.c.

1026{
1027 int natts = slot->tts_tupleDescriptor->natts;
1028 int i;
1029
1030 ExecClearTuple(slot);
1031
1032 /* Call the "in" function for each non-dropped, non-null attribute */
1033 Assert(natts == rel->attrmap->maplen);
1034 for (i = 0; i < natts; i++)
1035 {
1037 int remoteattnum = rel->attrmap->attnums[i];
1038
1039 if (!att->attisdropped && remoteattnum >= 0)
1040 {
1042
1044
1045 /* Set attnum for error callback */
1047
1049 {
1050 Oid typinput;
1051 Oid typioparam;
1052
1053 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1054 slot->tts_values[i] =
1056 typioparam, att->atttypmod);
1057 slot->tts_isnull[i] = false;
1058 }
1059 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1060 {
1061 Oid typreceive;
1062 Oid typioparam;
1063
1064 /*
1065 * In some code paths we may be asked to re-parse the same
1066 * tuple data. Reset the StringInfo's cursor so that works.
1067 */
1068 colvalue->cursor = 0;
1069
1070 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1071 slot->tts_values[i] =
1072 OidReceiveFunctionCall(typreceive, colvalue,
1073 typioparam, att->atttypmod);
1074
1075 /* Trouble if it didn't eat the whole buffer */
1076 if (colvalue->cursor != colvalue->len)
1077 ereport(ERROR,
1079 errmsg("incorrect binary data format in logical replication column %d",
1080 remoteattnum + 1)));
1081 slot->tts_isnull[i] = false;
1082 }
1083 else
1084 {
1085 /*
1086 * NULL value from remote. (We don't expect to see
1087 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1088 * NULL.)
1089 */
1090 slot->tts_values[i] = (Datum) 0;
1091 slot->tts_isnull[i] = true;
1092 }
1093
1094 /* Reset attnum for error callback */
1096 }
1097 else
1098 {
1099 /*
1100 * We assign NULL to dropped attributes and missing values
1101 * (missing values should be later filled using
1102 * slot_fill_defaults).
1103 */
1104 slot->tts_values[i] = (Datum) 0;
1105 slot->tts_isnull[i] = true;
1106 }
1107 }
1108
1110}

References apply_error_callback_arg, Assert, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, ereport, errcode(), errmsg, ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), fb(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr().

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_tuple_routing(), apply_handle_update(), and apply_handle_update_internal().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 5626 of file worker.c.

5627{
5628 PG_TRY();
5629 {
5631 }
5632 PG_CATCH();
5633 {
5634 /*
5635 * Reset the origin state to prevent the advancement of origin
5636 * progress if we fail to apply. Otherwise, this will result in
5637 * transaction loss as that transaction won't be sent again by the
5638 * server.
5639 */
5641
5644 else
5645 {
5646 /*
5647 * Report the worker failed while applying changes. Abort the
5648 * current transaction so that the stats message is sent in an
5649 * idle state.
5650 */
5653
5654 PG_RE_THROW();
5655 }
5656 }
5657 PG_END_TRY();
5658}
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition worker.c:3988
void DisableSubscriptionAndExit(void)
Definition worker.c:6007
#define PG_RE_THROW()
Definition elog.h:407
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_CATCH(...)
Definition elog.h:384

References AbortOutOfAnyTransaction(), Subscription::disableonerr, DisableSubscriptionAndExit(), fb(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), and replorigin_xact_clear().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ stop_conflict_info_retention()

static void stop_conflict_info_retention ( RetainDeadTuplesData rdt_data)
static

Definition at line 4811 of file worker.c.

4812{
4813 /* Stop retention if not yet */
4815 {
4816 /*
4817 * If the retention status cannot be updated (e.g., due to active
4818 * transaction), skip further processing to avoid inconsistent
4819 * retention behavior.
4820 */
4821 if (!update_retention_status(false))
4822 return;
4823
4827
4828 ereport(LOG,
4829 errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4831 errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4832 }
4833
4835
4836 /*
4837 * If retention has been stopped, reset to the initial phase to retry
4838 * resuming retention. This reset is required to recalculate the current
4839 * wait time and resume retention if the time falls within
4840 * max_retention_duration.
4841 */
4843}
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
Definition worker.c:4928

References Assert, ereport, errdetail(), errmsg, fb(), InvalidTransactionId, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::relmutex, reset_retention_data_fields(), Subscription::retentionactive, SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, and update_retention_status().

Referenced by process_rdt_phase_transition().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 6111 of file worker.c.

6112{
6113 if (!is_skipping_changes())
6114 return;
6115
6116 ereport(LOG,
6117 errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6119
6120 /* Stop skipping changes */
6122}

References ereport, errmsg, InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3946 of file worker.c.

3947{
3949
3950 /*
3951 * Skip for parallel apply workers, because the lsn_mapping is maintained
3952 * by the leader apply worker.
3953 */
3955 return;
3956
3957 /* Need to do this in permanent context */
3959
3960 /* Track commit lsn */
3962 flushpos->local_end = local_lsn;
3963 flushpos->remote_end = remote_lsn;
3964
3967}
#define palloc_object(type)
Definition fe_memutils.h:74
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition ilist.h:364

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), fb(), lsn_mapping, MemoryContextSwitchTo(), and palloc_object.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_abort_internal()

static void stream_abort_internal ( TransactionId  xid,
TransactionId  subxid 
)
static

Definition at line 1995 of file worker.c.

1996{
1997 /*
1998 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1999 * just delete the files with serialized info.
2000 */
2001 if (xid == subxid)
2003 else
2004 {
2005 /*
2006 * OK, so it's a subxact. We need to read the subxact file for the
2007 * toplevel transaction, determine the offset tracked for the subxact,
2008 * and truncate the file with changes. We also remove the subxacts
2009 * with higher offsets (or rather higher XIDs).
2010 *
2011 * We intentionally scan the array from the tail, because we're likely
2012 * aborting a change for the most recent subtransactions.
2013 *
2014 * We can't use the binary search here as subxact XIDs won't
2015 * necessarily arrive in sorted order, consider the case where we have
2016 * released the savepoint for multiple subtransactions and then
2017 * performed rollback to savepoint for one of the earlier
2018 * sub-transaction.
2019 */
2020 int64 i;
2021 int64 subidx;
2022 BufFile *fd;
2023 bool found = false;
2024 char path[MAXPGPATH];
2025
2026 subidx = -1;
2029
2030 for (i = subxact_data.nsubxacts; i > 0; i--)
2031 {
2032 if (subxact_data.subxacts[i - 1].xid == subxid)
2033 {
2034 subidx = (i - 1);
2035 found = true;
2036 break;
2037 }
2038 }
2039
2040 /*
2041 * If it's an empty sub-transaction then we will not find the subxid
2042 * here so just cleanup the subxact info and return.
2043 */
2044 if (!found)
2045 {
2046 /* Cleanup the subxact info */
2050 return;
2051 }
2052
2053 /* open the changes file */
2056 O_RDWR, false);
2057
2058 /* OK, truncate the file at the right offset */
2062
2063 /* discard the subxacts added later */
2065
2066 /* write the updated subxact list */
2068
2071 }
2072}
static void cleanup_subxact_info(void)
Definition worker.c:5607
static void subxact_info_write(Oid subid, TransactionId xid)
Definition worker.c:5225
static void subxact_info_read(Oid subid, TransactionId xid)
Definition worker.c:5274
void BufFileTruncateFileSet(BufFile *file, int fileno, pgoff_t offset)
Definition buffile.c:928
int64_t int64
Definition c.h:621
TransactionId xid
Definition worker.c:536
pgoff_t offset
Definition worker.c:538
int fileno
Definition worker.c:537

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), fb(), fd(), SubXactInfo::fileno, i, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_handle_stream_abort().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 5424 of file worker.c.

5425{
5426 char path[MAXPGPATH];
5427
5428 /* Delete the changes file. */
5429 changes_filename(path, subid, xid);
5431
5432 /* Delete the subxact file, if it exists. */
5433 subxact_filename(path, subid, xid);
5435}
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition worker.c:5403
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition buffile.c:365

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_close_file()

static void stream_close_file ( void  )
static

◆ stream_open_and_write_change()

static void stream_open_and_write_change ( TransactionId  xid,
char  action,
StringInfo  s 
)
static

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first_segment 
)
static

Definition at line 5448 of file worker.c.

5449{
5450 char path[MAXPGPATH];
5452
5453 Assert(OidIsValid(subid));
5455 Assert(stream_fd == NULL);
5456
5457
5458 changes_filename(path, subid, xid);
5459 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5460
5461 /*
5462 * Create/open the buffiles under the logical streaming context so that we
5463 * have those files until stream stop.
5464 */
5466
5467 /*
5468 * If this is the first streamed segment, create the changes file.
5469 * Otherwise, just open the file for writing, in append mode.
5470 */
5471 if (first_segment)
5473 path);
5474 else
5475 {
5476 /*
5477 * Open the file and seek to the end of the file because we always
5478 * append the changes file.
5479 */
5481 path, O_RDWR, false);
5483 }
5484
5486}
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition buffile.c:268

References Assert, BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, fb(), LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by stream_start_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1694 of file worker.c.

1695{
1697
1698 /*
1699 * Initialize the worker's stream_fileset if we haven't yet. This will be
1700 * used for the entire duration of the worker so create it in a permanent
1701 * context. We create this on the very first streaming message from any
1702 * transaction and then use it for this and other streaming transactions.
1703 * Now, we could create a fileset at the start of the worker as well but
1704 * then we won't be sure that it will ever be used.
1705 */
1707 {
1709
1711
1714
1716 }
1717
1718 /* Open the spool file for this transaction. */
1720
1721 /* If this is not the first segment, open existing subxact file. */
1722 if (!first_segment)
1724
1726}
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition worker.c:5448
void FileSetInit(FileSet *fileset)
Definition fileset.c:52

References ApplyContext, begin_replication_step(), end_replication_step(), fb(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc_object, LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1869 of file worker.c.

1870{
1871 /*
1872 * Serialize information about subxacts for the toplevel transaction, then
1873 * close the stream messages spool file.
1874 */
1877
1878 /* We must be in a valid transaction state */
1880
1881 /* Commit the per-stream transaction */
1883
1884 /* Reset per-stream context */
1886}

References Assert, CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 5511 of file worker.c.

5512{
5513 int len;
5514
5515 Assert(stream_fd != NULL);
5516
5517 /* total on-disk size, including the action type character */
5518 len = (s->len - s->cursor) + sizeof(char);
5519
5520 /* first write the size */
5521 BufFileWrite(stream_fd, &len, sizeof(len));
5522
5523 /* then the action */
5524 BufFileWrite(stream_fd, &action, sizeof(action));
5525
5526 /* and finally the remaining part of the buffer (after the XID) */
5527 len = (s->len - s->cursor);
5528
5530}
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
Definition buffile.c:677

References Assert, BufFileWrite(), StringInfoData::cursor, StringInfoData::data, fb(), StringInfoData::len, len, and stream_fd.

Referenced by apply_handle_stream_start(), apply_handle_stream_stop(), handle_streamed_transaction(), and stream_open_and_write_change().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
SysCacheIdentifier  cacheid,
uint32  hashvalue 
)
static

Definition at line 5210 of file worker.c.

5211{
5212 MySubscriptionValid = false;
5213}

References MySubscriptionValid.

Referenced by InitializeLogRepWorker().

◆ subxact_filename()

static void subxact_filename ( char path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 5403 of file worker.c.

5404{
5405 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5406}

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 5325 of file worker.c.

5326{
5327 SubXactInfo *subxacts = subxact_data.subxacts;
5328 int64 i;
5329
5330 /* We must have a valid top level stream xid and a stream fd. */
5332 Assert(stream_fd != NULL);
5333
5334 /*
5335 * If the XID matches the toplevel transaction, we don't want to add it.
5336 */
5337 if (stream_xid == xid)
5338 return;
5339
5340 /*
5341 * In most cases we're checking the same subxact as we've already seen in
5342 * the last call, so make sure to ignore it (this change comes later).
5343 */
5344 if (subxact_data.subxact_last == xid)
5345 return;
5346
5347 /* OK, remember we're processing this XID. */
5349
5350 /*
5351 * Check if the transaction is already present in the array of subxact. We
5352 * intentionally scan the array from the tail, because we're likely adding
5353 * a change for the most recent subtransactions.
5354 *
5355 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5356 * would allow us to use binary search here.
5357 */
5358 for (i = subxact_data.nsubxacts; i > 0; i--)
5359 {
5360 /* found, so we're done */
5361 if (subxacts[i - 1].xid == xid)
5362 return;
5363 }
5364
5365 /* This is a new subxact, so we need to add it to the array. */
5366 if (subxact_data.nsubxacts == 0)
5367 {
5369
5371
5372 /*
5373 * Allocate this memory for subxacts in per-stream context, see
5374 * subxact_info_read.
5375 */
5379 }
5381 {
5383 subxacts = repalloc_array(subxacts, SubXactInfo,
5385 }
5386
5387 subxacts[subxact_data.nsubxacts].xid = xid;
5388
5389 /*
5390 * Get the current offset of the stream file and store it as offset of
5391 * this subxact.
5392 */
5394 &subxacts[subxact_data.nsubxacts].fileno,
5395 &subxacts[subxact_data.nsubxacts].offset);
5396
5398 subxact_data.subxacts = subxacts;
5399}
#define repalloc_array(pointer, type, count)
Definition fe_memutils.h:78

References Assert, BufFileTell(), fb(), SubXactInfo::fileno, i, LogicalStreamingContext, MemoryContextSwitchTo(), ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, SubXactInfo::offset, palloc_array, repalloc_array, stream_fd, stream_xid, subxact_data, ApplySubXactData::subxact_last, ApplySubXactData::subxacts, TransactionIdIsValid, and SubXactInfo::xid.

Referenced by handle_streamed_transaction().

◆ subxact_info_read()

static void subxact_info_read ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5274 of file worker.c.

5275{
5276 char path[MAXPGPATH];
5277 Size len;
5278 BufFile *fd;
5280
5284
5285 /*
5286 * If the subxact file doesn't exist that means we don't have any subxact
5287 * info.
5288 */
5289 subxact_filename(path, subid, xid);
5291 true);
5292 if (fd == NULL)
5293 return;
5294
5295 /* read number of subxact items */
5297
5299
5300 /* we keep the maximum as a power of 2 */
5302
5303 /*
5304 * Allocate subxact information in the logical streaming context. We need
5305 * this information during the complete stream so that we can add the sub
5306 * transaction info to this. On stream stop we will flush this information
5307 * to the subxact file and reset the logical streaming context.
5308 */
5313
5314 if (len > 0)
5316
5318}
size_t Size
Definition c.h:689
static uint32 pg_ceil_log2_32(uint32 num)

References Assert, BufFileClose(), BufFileOpenFileSet(), BufFileReadExact(), fb(), fd(), len, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, palloc_array, pg_ceil_log2_32(), LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and stream_start_internal().

◆ subxact_info_write()

static void subxact_info_write ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 5225 of file worker.c.

5226{
5227 char path[MAXPGPATH];
5228 Size len;
5229 BufFile *fd;
5230
5232
5233 /* construct the subxact filename */
5234 subxact_filename(path, subid, xid);
5235
5236 /* Delete the subxacts file, if exists. */
5237 if (subxact_data.nsubxacts == 0)
5238 {
5241
5242 return;
5243 }
5244
5245 /*
5246 * Create the subxact file if it not already created, otherwise open the
5247 * existing file.
5248 */
5250 true);
5251 if (fd == NULL)
5253
5255
5256 /* Write the subxact count and subxact info */
5259
5261
5262 /* free the memory allocated for subxact info */
5264}

References Assert, BufFileClose(), BufFileCreateFileSet(), BufFileDeleteFileSet(), BufFileOpenFileSet(), BufFileWrite(), cleanup_subxact_info(), fb(), fd(), len, MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, LogicalRepWorker::stream_fileset, subxact_data, subxact_filename(), ApplySubXactData::subxacts, and TransactionIdIsValid.

Referenced by stream_abort_internal(), and stream_stop_internal().

◆ TargetPrivilegesCheck()

static void TargetPrivilegesCheck ( Relation  rel,
AclMode  mode 
)
static

Definition at line 2608 of file worker.c.

2609{
2610 Oid relid;
2612
2613 relid = RelationGetRelid(rel);
2615 if (aclresult != ACLCHECK_OK)
2617 get_relkind_objtype(rel->rd_rel->relkind),
2618 get_rel_name(relid));
2619
2620 /*
2621 * We lack the infrastructure to honor RLS policies. It might be possible
2622 * to add such infrastructure here, but tablesync workers lack it, too, so
2623 * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2624 * but it seems dangerous to replicate a TRUNCATE and then refuse to
2625 * replicate subsequent INSERTs, so we forbid all commands the same.
2626 */
2627 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2628 ereport(ERROR,
2630 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2633}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4082
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
ObjectType get_relkind_objtype(char relkind)
static PgChecksumMode mode
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45

References aclcheck_error(), ACLCHECK_OK, check_enable_rls(), ereport, errcode(), errmsg, ERROR, fb(), get_rel_name(), get_relkind_objtype(), GetUserId(), GetUserNameFromId(), InvalidOid, mode, pg_class_aclcheck(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, and RLS_ENABLED.

Referenced by apply_handle_delete_internal(), apply_handle_insert_internal(), apply_handle_truncate(), apply_handle_tuple_routing(), apply_handle_update_internal(), and FindReplTupleInLocalRel().

◆ update_retention_status()

static bool update_retention_status ( bool  active)
static

Definition at line 4889 of file worker.c.

4890{
4891 /*
4892 * Do not update the catalog during an active transaction. The transaction
4893 * may be started during change application, leading to a possible
4894 * rollback of catalog updates if the application fails subsequently.
4895 */
4896 if (IsTransactionState())
4897 return false;
4898
4900
4901 /*
4902 * Updating pg_subscription might involve TOAST table access, so ensure we
4903 * have a valid snapshot.
4904 */
4906
4907 /* Update pg_subscription.subretentionactive */
4909
4912
4913 /* Notify launcher to update the conflict slot */
4915
4917
4918 return true;
4919}
void ApplyLauncherWakeup(void)
Definition launcher.c:1195
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)

References ApplyLauncherWakeup(), CommitTransactionCommand(), GetTransactionSnapshot(), IsTransactionState(), MySubscription, Subscription::oid, PopActiveSnapshot(), PushActiveSnapshot(), Subscription::retentionactive, StartTransactionCommand(), and UpdateDeadTupleRetentionStatus().

Referenced by resume_conflict_info_retention(), and stop_conflict_info_retention().

◆ UpdateWorkerStats()

◆ wait_for_local_flush()

static void wait_for_local_flush ( RetainDeadTuplesData rdt_data)
static

Definition at line 4621 of file worker.c.

4622{
4623 Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4624 TransactionIdIsValid(rdt_data->candidate_xid));
4625
4626 /*
4627 * We expect the publisher and subscriber clocks to be in sync using time
4628 * sync service like NTP. Otherwise, we will advance this worker's
4629 * oldest_nonremovable_xid prematurely, leading to the removal of rows
4630 * required to detect update_deleted reliably. This check primarily
4631 * addresses scenarios where the publisher's clock falls behind; if the
4632 * publisher's clock is ahead, subsequent transactions will naturally bear
4633 * later commit timestamps, conforming to the design outlined atop
4634 * worker.c.
4635 *
4636 * XXX Consider waiting for the publisher's clock to catch up with the
4637 * subscriber's before proceeding to the next phase.
4638 */
4639 if (TimestampDifferenceExceeds(rdt_data->reply_time,
4640 rdt_data->candidate_xid_time, 0))
4641 ereport(ERROR,
4642 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4643 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4644
4645 /*
4646 * Do not attempt to advance the non-removable transaction ID when table
4647 * sync is in progress. During this time, changes from a single
4648 * transaction may be applied by multiple table sync workers corresponding
4649 * to the target tables. So, it's necessary for all table sync workers to
4650 * apply and flush the corresponding changes before advancing the
4651 * transaction ID, otherwise, dead tuples that are still needed for
4652 * conflict detection in table sync workers could be removed prematurely.
4653 * However, confirming the apply and flush progress across all table sync
4654 * workers is complex and not worth the effort, so we simply return if not
4655 * all tables are in the READY state.
4656 *
4657 * Advancing the transaction ID is necessary even when no tables are
4658 * currently subscribed, to avoid retaining dead tuples unnecessarily.
4659 * While it might seem safe to skip all phases and directly assign
4660 * candidate_xid to oldest_nonremovable_xid during the
4661 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4662 * concurrently add tables to the subscription, the apply worker may not
4663 * process invalidations in time. Consequently,
4664 * HasSubscriptionTablesCached() might miss the new tables, leading to
4665 * premature advancement of oldest_nonremovable_xid.
4666 *
4667 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4668 * invalidations are guaranteed to be processed before applying changes
4669 * from newly added tables while waiting for the local flush to reach
4670 * remote_lsn.
4671 *
4672 * Additionally, even if we check for subscription tables during
4673 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4674 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4675 * subscription tables at this stage to prevent unnecessary tuple
4676 * retention.
4677 */
4679 {
4681
4682 now = rdt_data->last_recv_time
4683 ? rdt_data->last_recv_time : GetCurrentTimestamp();
4684
4685 /*
4686 * Record the time spent waiting for table sync, it is needed for the
4687 * timeout check in should_stop_conflict_info_retention().
4688 */
4689 rdt_data->table_sync_wait_time =
4690 TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4691
4692 return;
4693 }
4694
4695 /*
4696 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4697 * retaining conflict information for this worker.
4698 */
4700 {
4702 return;
4703 }
4704
4705 /*
4706 * Update and check the remote flush position if we are applying changes
4707 * in a loop. This is done at most once per WalWriterDelay to avoid
4708 * performing costly operations in get_flush_position() too frequently
4709 * during change application.
4710 */
4711 if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4712 TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4713 rdt_data->last_recv_time, WalWriterDelay))
4714 {
4717 bool have_pending_txes;
4718
4719 /* Fetch the latest remote flush position */
4721
4722 if (flushpos > last_flushpos)
4724
4725 rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4726 }
4727
4728 /* Return to wait for the changes to be applied */
4729 if (last_flushpos < rdt_data->remote_lsn)
4730 return;
4731
4732 /*
4733 * Reaching this point implies should_stop_conflict_info_retention()
4734 * returned false earlier, meaning that the most recent duration for
4735 * advancing the non-removable transaction ID is within the
4736 * max_retention_duration or max_retention_duration is set to 0.
4737 *
4738 * Therefore, if conflict info retention was previously stopped due to a
4739 * timeout, it is now safe to resume retention.
4740 */
4742 {
4744 return;
4745 }
4746
4747 /*
4748 * Reaching here means the remote WAL position has been received, and all
4749 * transactions up to that position on the publisher have been applied and
4750 * flushed locally. So, we can advance the non-removable transaction ID.
4751 */
4755
4756 elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4757 LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4758 rdt_data->candidate_xid);
4759
4760 /* Notify launcher to update the xmin of the conflict slot */
4762
4764
4765 /* process the next phase */
4767}
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
Definition worker.c:4776
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1659

References AllTablesyncsReady(), ApplyLauncherWakeup(), Assert, DEBUG2, elog, ereport, errdetail_internal(), errmsg_internal(), ERROR, fb(), get_flush_position(), GetCurrentTimestamp(), HasSubscriptionTablesCached(), last_flushpos, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, now(), LogicalRepWorker::oldest_nonremovable_xid, process_rdt_phase_transition(), RDT_RESUME_CONFLICT_INFO_RETENTION, RDT_STOP_CONFLICT_INFO_RETENTION, LogicalRepWorker::relmutex, reset_retention_data_fields(), Subscription::retentionactive, should_stop_conflict_info_retention(), SpinLockAcquire(), SpinLockRelease(), TimestampDifferenceExceeds(), TimestampDifferenceMilliseconds(), TransactionIdIsValid, WalWriterDelay, and XLogRecPtrIsValid.

Referenced by process_rdt_phase_transition().

◆ wait_for_publisher_status()

static void wait_for_publisher_status ( RetainDeadTuplesData rdt_data,
bool  status_received 
)
static

Definition at line 4562 of file worker.c.

4564{
4565 /*
4566 * Return if we have requested but not yet received the publisher status.
4567 */
4568 if (!status_received)
4569 return;
4570
4571 /*
4572 * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4573 * retaining conflict information for this worker.
4574 */
4576 {
4578 return;
4579 }
4580
4581 if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4582 rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4583
4584 /*
4585 * Check if all remote concurrent transactions that were active at the
4586 * first status request have now completed. If completed, proceed to the
4587 * next phase; otherwise, continue checking the publisher status until
4588 * these transactions finish.
4589 *
4590 * It's possible that transactions in the commit phase during the last
4591 * cycle have now finished committing, but remote_oldestxid remains older
4592 * than remote_wait_for. This can happen if some old transaction came in
4593 * the commit phase when we requested status in this cycle. We do not
4594 * handle this case explicitly as it's rare and the benefit doesn't
4595 * justify the required complexity. Tracking would require either caching
4596 * all xids at the publisher or sending them to subscribers. The condition
4597 * will resolve naturally once the remaining transactions are finished.
4598 *
4599 * Directly advancing the non-removable transaction ID is possible if
4600 * there are no activities on the publisher since the last advancement
4601 * cycle. However, it requires maintaining two fields, last_remote_nextxid
4602 * and last_remote_lsn, within the structure for comparison with the
4603 * current cycle's values. Considering the minimal cost of continuing in
4604 * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4605 * advance the transaction ID here.
4606 */
4607 if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4608 rdt_data->remote_oldestxid))
4610 else
4612
4613 /* process the next phase */
4615}
#define FullTransactionIdPrecedesOrEquals(a, b)
Definition transam.h:52
#define FullTransactionIdIsValid(x)
Definition transam.h:55

References fb(), FullTransactionIdIsValid, FullTransactionIdPrecedesOrEquals, process_rdt_phase_transition(), RDT_REQUEST_PUBLISHER_STATUS, RDT_STOP_CONFLICT_INFO_RETENTION, RDT_WAIT_FOR_LOCAL_FLUSH, and should_stop_conflict_info_retention().

Referenced by process_rdt_phase_transition().

Variable Documentation

◆ apply_error_callback_arg

ApplyErrorCallbackArg apply_error_callback_arg
static
Initial value:
=
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
}

Definition at line 464 of file worker.c.

465{
466 .command = 0,
467 .rel = NULL,
468 .remote_attnum = -1,
469 .remote_xid = InvalidTransactionId,
470 .finish_lsn = InvalidXLogRecPtr,
471 .origin_name = NULL,
472};

Referenced by apply_dispatch(), apply_error_callback(), apply_handle_delete(), apply_handle_insert(), apply_handle_update(), reset_apply_error_context_info(), set_apply_error_context_origin(), set_apply_error_context_xact(), slot_modify_data(), and slot_store_data().

◆ apply_error_context_stack

ErrorContextCallback* apply_error_context_stack = NULL

Definition at line 474 of file worker.c.

Referenced by LogicalRepApplyLoop(), and ProcessParallelApplyMessage().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ in_streamed_transaction

◆ InitializingApplyWorker

bool InitializingApplyWorker = false

Definition at line 504 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ last_flushpos

XLogRecPtr last_flushpos = InvalidXLogRecPtr
static

Definition at line 532 of file worker.c.

Referenced by send_feedback(), and wait_for_local_flush().

◆ LogicalStreamingContext

MemoryContext LogicalStreamingContext = NULL
static

◆ LogRepWorkerWalRcvConn

◆ lsn_mapping

dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
static

Definition at line 313 of file worker.c.

Referenced by get_flush_position(), LogicalRepApplyLoop(), and store_flush_position().

◆ MySubscription

◆ MySubscriptionValid

bool MySubscriptionValid = false
static

◆ on_commit_wakeup_workers_subids

List* on_commit_wakeup_workers_subids = NIL
static

Definition at line 487 of file worker.c.

Referenced by AtEOXact_LogicalRepWorkers(), and LogicalRepWorkersWakeupAtCommit().

◆ parallel_stream_nchanges

uint32 parallel_stream_nchanges = 0
static

◆ remote_final_lsn

◆ skip_xact_finish_lsn

XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
static

Definition at line 521 of file worker.c.

Referenced by maybe_start_skipping_changes(), and stop_skipping_changes().

◆ stream_fd

◆ stream_xid

◆ subxact_data