PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Enumerations

enum  TransApplyAction {
  TRANS_LEADER_APPLY , TRANS_LEADER_SERIALIZE , TRANS_LEADER_SEND_TO_PARALLEL , TRANS_LEADER_PARTIAL_SERIALIZE ,
  TRANS_PARALLEL_APPLY
}
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first_segment)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_open_and_write_change (TransactionId xid, char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void DisableSubscriptionAndExit (void)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
 
static bool FindReplTupleInLocalRel (EState *estate, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void TwoPhaseTransactionGid (Oid subid, TransactionId xid, char *gid, int szgid)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static TransApplyAction get_transaction_apply_action (TransactionId xid, ParallelApplyWorkerInfo **winfo)
 
static const char * get_worker_name (void)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
static void apply_handle_stream_start (StringInfo s)
 
void stream_stop_internal (TransactionId xid)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void stream_abort_internal (TransactionId xid, TransactionId subxid)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void ensure_last_message (FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
void apply_dispatch (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void apply_worker_exit (void)
 
void maybe_reread_subscription (void)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **myslotname)
 
static void start_apply (XLogRecPtr origin_startpos)
 
void InitializeApplyWorker (void)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void apply_error_callback (void *arg)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 
void set_apply_error_context_origin (char *originname)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
ApplyErrorCallbackArg apply_error_callback_arg
 
ErrorContextCallbackapply_error_context_stack = NULL
 
MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
static Liston_commit_wakeup_workers_subids = NIL
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static uint32 parallel_stream_nchanges = 0
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 349 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 211 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Enumeration Type Documentation

◆ TransApplyAction

Enumerator
TRANS_LEADER_APPLY 
TRANS_LEADER_SERIALIZE 
TRANS_LEADER_SEND_TO_PARALLEL 
TRANS_LEADER_PARTIAL_SERIALIZE 
TRANS_PARALLEL_APPLY 

Definition at line 281 of file worker.c.

282 {
283  /* The action for non-streaming transactions. */
285 
286  /* Actions for streaming transactions. */
TransApplyAction
Definition: worker.c:282
@ TRANS_LEADER_SERIALIZE
Definition: worker.c:287
@ TRANS_PARALLEL_APPLY
Definition: worker.c:290
@ TRANS_LEADER_SEND_TO_PARALLEL
Definition: worker.c:288
@ TRANS_LEADER_APPLY
Definition: worker.c:284
@ TRANS_LEADER_PARTIAL_SERIALIZE
Definition: worker.c:289

Function Documentation

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3236 of file worker.c.

3237 {
3239  LogicalRepMsgType saved_command;
3240 
3241  /*
3242  * Set the current command being applied. Since this function can be
3243  * called recursively when applying spooled changes, save the current
3244  * command.
3245  */
3246  saved_command = apply_error_callback_arg.command;
3248 
3249  switch (action)
3250  {
3251  case LOGICAL_REP_MSG_BEGIN:
3252  apply_handle_begin(s);
3253  break;
3254 
3257  break;
3258 
3261  break;
3262 
3265  break;
3266 
3269  break;
3270 
3273  break;
3274 
3277  break;
3278 
3279  case LOGICAL_REP_MSG_TYPE:
3280  apply_handle_type(s);
3281  break;
3282 
3285  break;
3286 
3288 
3289  /*
3290  * Logical replication does not use generic logical messages yet.
3291  * Although, it could be used by other applications that use this
3292  * output plugin.
3293  */
3294  break;
3295 
3298  break;
3299 
3302  break;
3303 
3306  break;
3307 
3310  break;
3311 
3314  break;
3315 
3318  break;
3319 
3322  break;
3323 
3326  break;
3327 
3330  break;
3331 
3332  default:
3333  ereport(ERROR,
3334  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3335  errmsg("invalid logical replication message type \"%c\"", action)));
3336  }
3337 
3338  /* Reset the current command */
3339  apply_error_callback_arg.command = saved_command;
3340 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1291
static void apply_handle_type(StringInfo s)
Definition: worker.c:2346
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3114
static void apply_handle_update(StringInfo s)
Definition: worker.c:2529
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2153
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1189
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:294
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2700
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1011
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1036
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1832
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2323
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1128
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1238
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1646
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1428
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1062
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1487
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2393
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
LogicalRepMsgType command
Definition: worker.c:237

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4854 of file worker.c.

4855 {
4857 
4859  return;
4860 
4861  Assert(errarg->origin_name);
4862 
4863  if (errarg->rel == NULL)
4864  {
4865  if (!TransactionIdIsValid(errarg->remote_xid))
4866  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4867  errarg->origin_name,
4868  logicalrep_message_type(errarg->command));
4869  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4870  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4871  errarg->origin_name,
4873  errarg->remote_xid);
4874  else
4875  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4876  errarg->origin_name,
4878  errarg->remote_xid,
4879  LSN_FORMAT_ARGS(errarg->finish_lsn));
4880  }
4881  else
4882  {
4883  if (errarg->remote_attnum < 0)
4884  {
4885  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4886  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4887  errarg->origin_name,
4889  errarg->rel->remoterel.nspname,
4890  errarg->rel->remoterel.relname,
4891  errarg->remote_xid);
4892  else
4893  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4894  errarg->origin_name,
4896  errarg->rel->remoterel.nspname,
4897  errarg->rel->remoterel.relname,
4898  errarg->remote_xid,
4899  LSN_FORMAT_ARGS(errarg->finish_lsn));
4900  }
4901  else
4902  {
4903  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4904  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4905  errarg->origin_name,
4907  errarg->rel->remoterel.nspname,
4908  errarg->rel->remoterel.relname,
4909  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4910  errarg->remote_xid);
4911  else
4912  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4913  errarg->origin_name,
4915  errarg->rel->remoterel.nspname,
4916  errarg->rel->remoterel.relname,
4917  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4918  errarg->remote_xid,
4919  LSN_FORMAT_ARGS(errarg->finish_lsn));
4920  }
4921  }
4922 }
#define errcontext
Definition: elog.h:196
Assert(fmt[strlen(fmt) - 1] !='\n')
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:242
XLogRecPtr finish_lsn
Definition: worker.c:243
LogicalRepRelMapEntry * rel
Definition: worker.c:238
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 1011 of file worker.c.

1012 {
1013  LogicalRepBeginData begin_data;
1014 
1015  /* There must not be an active streaming transaction. */
1017 
1018  logicalrep_read_begin(s, &begin_data);
1019  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1020 
1021  remote_final_lsn = begin_data.final_lsn;
1022 
1024 
1025  in_remote_transaction = true;
1026 
1028 }
bool in_remote_transaction
Definition: worker.c:319
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4926
static XLogRecPtr remote_final_lsn
Definition: worker.c:320
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4724
static TransactionId stream_xid
Definition: worker.c:325
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131

References Assert(), LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 1062 of file worker.c.

1063 {
1064  LogicalRepPreparedTxnData begin_data;
1065 
1066  /* Tablesync should never receive prepare. */
1067  if (am_tablesync_worker())
1068  ereport(ERROR,
1069  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1070  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1071 
1072  /* There must not be an active streaming transaction. */
1074 
1075  logicalrep_read_begin_prepare(s, &begin_data);
1076  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1077 
1078  remote_final_lsn = begin_data.prepare_lsn;
1079 
1081 
1082  in_remote_transaction = true;
1083 
1085 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, stream_xid, TransactionIdIsValid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 1036 of file worker.c.

1037 {
1038  LogicalRepCommitData commit_data;
1039 
1040  logicalrep_read_commit(s, &commit_data);
1041 
1042  if (commit_data.commit_lsn != remote_final_lsn)
1043  ereport(ERROR,
1044  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1045  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1046  LSN_FORMAT_ARGS(commit_data.commit_lsn),
1048 
1049  apply_handle_commit_internal(&commit_data);
1050 
1051  /* Process any tables that are being synchronized in parallel. */
1052  process_syncing_tables(commit_data.end_lsn);
1053 
1056 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:2263
static void reset_apply_error_context_info(void)
Definition: worker.c:4934
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:647

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 2263 of file worker.c.

2264 {
2265  if (is_skipping_changes())
2266  {
2268 
2269  /*
2270  * Start a new transaction to clear the subskiplsn, if not started
2271  * yet.
2272  */
2273  if (!IsTransactionState())
2275  }
2276 
2277  if (IsTransactionState())
2278  {
2279  /*
2280  * The transaction is either non-empty or skipped, so we clear the
2281  * subskiplsn.
2282  */
2284 
2285  /*
2286  * Update origin state so we can restart streaming from correct
2287  * position in case of crash.
2288  */
2289  replorigin_session_origin_lsn = commit_data->end_lsn;
2291 
2293 
2294  if (IsTransactionBlock())
2295  {
2296  EndTransactionBlock(false);
2298  }
2299 
2300  pgstat_report_stat(false);
2301 
2303  }
2304  else
2305  {
2306  /* Process any invalidation messages that might have accumulated. */
2309  }
2310 
2311  in_remote_transaction = false;
2312 }
static void stop_skipping_changes(void)
Definition: worker.c:4751
#define is_skipping_changes()
Definition: worker.c:349
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:4773
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3400
void maybe_reread_subscription(void)
Definition: worker.c:3823
void AcceptInvalidationMessages(void)
Definition: inval.c:746
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
long pgstat_report_stat(bool force)
Definition: pgstat.c:575
TimestampTz committime
Definition: logicalproto.h:138
bool IsTransactionState(void)
Definition: xact.c:378
void StartTransactionCommand(void)
Definition: xact.c:2944
bool IsTransactionBlock(void)
Definition: xact.c:4823
void CommitTransactionCommand(void)
Definition: xact.c:3041
bool EndTransactionBlock(bool chain)
Definition: xact.c:3897
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:258

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, EndTransactionBlock(), in_remote_transaction, is_skipping_changes, IsTransactionBlock(), IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 1189 of file worker.c.

1190 {
1191  LogicalRepCommitPreparedTxnData prepare_data;
1192  char gid[GIDSIZE];
1193 
1194  logicalrep_read_commit_prepared(s, &prepare_data);
1195  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1196 
1197  /* Compute GID for two_phase transactions. */
1199  gid, sizeof(gid));
1200 
1201  /* There is no transaction when COMMIT PREPARED is called */
1203 
1204  /*
1205  * Update origin state so we can restart streaming from correct position
1206  * in case of crash.
1207  */
1208  replorigin_session_origin_lsn = prepare_data.end_lsn;
1210 
1211  FinishPreparedTransaction(gid, true);
1214  pgstat_report_stat(false);
1215 
1217  in_remote_transaction = false;
1218 
1219  /* Process any tables that are being synchronized in parallel. */
1220  process_syncing_tables(prepare_data.end_lsn);
1221 
1222  clear_subscription_skip_lsn(prepare_data.end_lsn);
1223 
1226 }
static void begin_replication_step(void)
Definition: worker.c:528
static void end_replication_step(void)
Definition: worker.c:551
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:4298
Subscription * MySubscription
Definition: worker.c:314
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1480
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 2700 of file worker.c.

2701 {
2702  LogicalRepRelMapEntry *rel;
2703  LogicalRepTupleData oldtup;
2704  LogicalRepRelId relid;
2705  ApplyExecutionData *edata;
2706  EState *estate;
2707  TupleTableSlot *remoteslot;
2708  MemoryContext oldctx;
2709 
2710  /*
2711  * Quick return if we are skipping data modification changes or handling
2712  * streamed transactions.
2713  */
2714  if (is_skipping_changes() ||
2716  return;
2717 
2719 
2720  relid = logicalrep_read_delete(s, &oldtup);
2721  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2722  if (!should_apply_changes_for_rel(rel))
2723  {
2724  /*
2725  * The relation can't become interesting in the middle of the
2726  * transaction so it's safe to unlock it.
2727  */
2730  return;
2731  }
2732 
2733  /* Set relation for error callback */
2735 
2736  /* Check if we can do the delete. */
2738 
2739  /* Initialize the executor state. */
2740  edata = create_edata_for_relation(rel);
2741  estate = edata->estate;
2742  remoteslot = ExecInitExtraTupleSlot(estate,
2743  RelationGetDescr(rel->localrel),
2744  &TTSOpsVirtual);
2745 
2746  /* Build the search tuple. */
2748  slot_store_data(remoteslot, rel, &oldtup);
2749  MemoryContextSwitchTo(oldctx);
2750 
2751  /* For a partitioned table, apply delete to correct partition. */
2752  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2754  remoteslot, NULL, CMD_DELETE);
2755  else
2757  remoteslot, rel->localindexoid);
2758 
2759  finish_edata(edata);
2760 
2761  /* Reset relation for error callback */
2763 
2765 
2767 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:2488
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:672
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:497
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:579
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2863
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:817
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:729
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, Oid localindexoid)
Definition: worker.c:2775
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:553
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:101
@ CMD_DELETE
Definition: nodes.h:279
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:564
#define RelationGetDescr(relation)
Definition: rel.h:529
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:325
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:471
ResultRelInfo * targetRelInfo
Definition: worker.c:227
EState * estate
Definition: worker.c:224
Form_pg_class rd_rel
Definition: rel.h:110

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
Oid  localindexoid 
)
static

Definition at line 2775 of file worker.c.

2779 {
2780  EState *estate = edata->estate;
2781  Relation localrel = relinfo->ri_RelationDesc;
2782  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2783  EPQState epqstate;
2784  TupleTableSlot *localslot;
2785  bool found;
2786 
2787  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2788  ExecOpenIndices(relinfo, false);
2789 
2790  found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
2791  remoteslot, &localslot);
2792 
2793  /* If found delete it. */
2794  if (found)
2795  {
2796  EvalPlanQualSetSlot(&epqstate, localslot);
2797 
2798  /* Do the actual delete. */
2800  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2801  }
2802  else
2803  {
2804  /*
2805  * The tuple to be deleted could not be found. Do nothing except for
2806  * emitting a log message.
2807  *
2808  * XXX should this be promoted to ereport(LOG) perhaps?
2809  */
2810  elog(DEBUG1,
2811  "logical replication did not find row to be deleted "
2812  "in replication target relation \"%s\"",
2813  RelationGetRelationName(localrel));
2814  }
2815 
2816  /* Cleanup. */
2817  ExecCloseIndices(relinfo);
2818  EvalPlanQualEnd(&epqstate);
2819 }
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, Oid localidxoid, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2829
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:2361
#define DEBUG1
Definition: elog.h:30
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2933
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2511
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:242
#define ACL_DELETE
Definition: parsenodes.h:86
#define NIL
Definition: pg_list.h:68
#define RelationGetRelationName(relation)
Definition: rel.h:537
LogicalRepRelMapEntry * targetRel
Definition: worker.c:226
Relation ri_RelationDesc
Definition: execnodes.h:450

References ACL_DELETE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 2393 of file worker.c.

2394 {
2395  LogicalRepRelMapEntry *rel;
2396  LogicalRepTupleData newtup;
2397  LogicalRepRelId relid;
2398  ApplyExecutionData *edata;
2399  EState *estate;
2400  TupleTableSlot *remoteslot;
2401  MemoryContext oldctx;
2402 
2403  /*
2404  * Quick return if we are skipping data modification changes or handling
2405  * streamed transactions.
2406  */
2407  if (is_skipping_changes() ||
2409  return;
2410 
2412 
2413  relid = logicalrep_read_insert(s, &newtup);
2414  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2415  if (!should_apply_changes_for_rel(rel))
2416  {
2417  /*
2418  * The relation can't become interesting in the middle of the
2419  * transaction so it's safe to unlock it.
2420  */
2423  return;
2424  }
2425 
2426  /* Set relation for error callback */
2428 
2429  /* Initialize the executor state. */
2430  edata = create_edata_for_relation(rel);
2431  estate = edata->estate;
2432  remoteslot = ExecInitExtraTupleSlot(estate,
2433  RelationGetDescr(rel->localrel),
2434  &TTSOpsVirtual);
2435 
2436  /* Process and store remote tuple in the slot */
2438  slot_store_data(remoteslot, rel, &newtup);
2439  slot_fill_defaults(rel, estate, remoteslot);
2440  MemoryContextSwitchTo(oldctx);
2441 
2442  /* For a partitioned table, insert the tuple into a partition. */
2443  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2445  remoteslot, NULL, CMD_INSERT);
2446  else
2448  remoteslot);
2449 
2450  finish_edata(edata);
2451 
2452  /* Reset relation for error callback */
2454 
2456 
2458 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2466
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:760
@ CMD_INSERT
Definition: nodes.h:278
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2466 of file worker.c.

2469 {
2470  EState *estate = edata->estate;
2471 
2472  /* We must open indexes here. */
2473  ExecOpenIndices(relinfo, false);
2474 
2475  /* Do the insert. */
2477  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2478 
2479  /* Cleanup. */
2480  ExecCloseIndices(relinfo);
2481 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:83

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1428 of file worker.c.

1429 {
1430  /*
1431  * ORIGIN message can only come inside streaming transaction or inside
1432  * remote transaction and before any actual writes.
1433  */
1434  if (!in_streamed_transaction &&
1437  ereport(ERROR,
1438  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1439  errmsg_internal("ORIGIN message sent out of order")));
1440 }
static bool in_streamed_transaction
Definition: worker.c:323

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 1128 of file worker.c.

1129 {
1130  LogicalRepPreparedTxnData prepare_data;
1131 
1132  logicalrep_read_prepare(s, &prepare_data);
1133 
1134  if (prepare_data.prepare_lsn != remote_final_lsn)
1135  ereport(ERROR,
1136  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1137  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1138  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1140 
1141  /*
1142  * Unlike commit, here, we always prepare the transaction even though no
1143  * change has happened in this transaction or all changes are skipped. It
1144  * is done this way because at commit prepared time, we won't know whether
1145  * we have skipped preparing a transaction because of those reasons.
1146  *
1147  * XXX, We can optimize such that at commit prepared time, we first check
1148  * whether we have prepared the transaction or not but that doesn't seem
1149  * worthwhile because such cases shouldn't be common.
1150  */
1152 
1153  apply_handle_prepare_internal(&prepare_data);
1154 
1157  pgstat_report_stat(false);
1158 
1160 
1161  in_remote_transaction = false;
1162 
1163  /* Process any tables that are being synchronized in parallel. */
1164  process_syncing_tables(prepare_data.end_lsn);
1165 
1166  /*
1167  * Since we have already prepared the transaction, in a case where the
1168  * server crashes before clearing the subskiplsn, it will be left but the
1169  * transaction won't be resent. But that's okay because it's a rare case
1170  * and the subskiplsn will be cleared when finishing the next transaction.
1171  */
1174 
1177 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:1091
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 1091 of file worker.c.

1092 {
1093  char gid[GIDSIZE];
1094 
1095  /*
1096  * Compute unique GID for two_phase transactions. We don't use GID of
1097  * prepared transaction sent by server as that can lead to deadlock when
1098  * we have multiple subscriptions from same node point to publications on
1099  * the same node. See comments atop worker.c
1100  */
1101  TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1102  gid, sizeof(gid));
1103 
1104  /*
1105  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1106  * called within the PrepareTransactionBlock below.
1107  */
1108  if (!IsTransactionBlock())
1109  {
1111  CommitTransactionCommand(); /* Completes the preceding Begin command. */
1112  }
1113 
1114  /*
1115  * Update origin state so we can restart streaming from correct position
1116  * in case of crash.
1117  */
1118  replorigin_session_origin_lsn = prepare_data->end_lsn;
1120 
1122 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3845
void BeginTransactionBlock(void)
Definition: xact.c:3777

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, IsTransactionBlock(), MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 2323 of file worker.c.

2324 {
2325  LogicalRepRelation *rel;
2326 
2328  return;
2329 
2330  rel = logicalrep_read_rel(s);
2332 
2333  /* Also reset all entries in the partition map that refer to remoterel. */
2335 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:700
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:538
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:162

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1238 of file worker.c.

1239 {
1240  LogicalRepRollbackPreparedTxnData rollback_data;
1241  char gid[GIDSIZE];
1242 
1243  logicalrep_read_rollback_prepared(s, &rollback_data);
1244  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1245 
1246  /* Compute GID for two_phase transactions. */
1247  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1248  gid, sizeof(gid));
1249 
1250  /*
1251  * It is possible that we haven't received prepare because it occurred
1252  * before walsender reached a consistent point or the two_phase was still
1253  * not enabled by that time, so in such cases, we need to skip rollback
1254  * prepared.
1255  */
1256  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1257  rollback_data.prepare_time))
1258  {
1259  /*
1260  * Update origin state so we can restart streaming from correct
1261  * position in case of crash.
1262  */
1265 
1266  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1268  FinishPreparedTransaction(gid, false);
1271 
1273  }
1274 
1275  pgstat_report_stat(false);
1276 
1278  in_remote_transaction = false;
1279 
1280  /* Process any tables that are being synchronized in parallel. */
1282 
1285 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2577

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), XactLastCommitEnd, and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1832 of file worker.c.

1833 {
1834  TransactionId xid;
1835  TransactionId subxid;
1836  LogicalRepStreamAbortData abort_data;
1837  ParallelApplyWorkerInfo *winfo;
1838  TransApplyAction apply_action;
1839 
1840  /* Save the message before it is consumed. */
1841  StringInfoData original_msg = *s;
1842  bool toplevel_xact;
1843 
1845  ereport(ERROR,
1846  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1847  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1848 
1849  /* We receive abort information only when we can apply in parallel. */
1850  logicalrep_read_stream_abort(s, &abort_data,
1852 
1853  xid = abort_data.xid;
1854  subxid = abort_data.subxid;
1855  toplevel_xact = (xid == subxid);
1856 
1857  set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1858 
1859  apply_action = get_transaction_apply_action(xid, &winfo);
1860 
1861  switch (apply_action)
1862  {
1863  case TRANS_LEADER_APPLY:
1864 
1865  /*
1866  * We are in the leader apply worker and the transaction has been
1867  * serialized to file.
1868  */
1869  stream_abort_internal(xid, subxid);
1870 
1871  elog(DEBUG1, "finished processing the STREAM ABORT command");
1872  break;
1873 
1875  Assert(winfo);
1876 
1877  /*
1878  * For the case of aborting the subtransaction, we increment the
1879  * number of streaming blocks and take the lock again before
1880  * sending the STREAM_ABORT to ensure that the parallel apply
1881  * worker will wait on the lock for the next set of changes after
1882  * processing the STREAM_ABORT message if it is not already
1883  * waiting for STREAM_STOP message.
1884  *
1885  * It is important to perform this locking before sending the
1886  * STREAM_ABORT message so that the leader can hold the lock first
1887  * and the parallel apply worker will wait for the leader to
1888  * release the lock. This is the same as what we do in
1889  * apply_handle_stream_stop. See Locking Considerations atop
1890  * applyparallelworker.c.
1891  */
1892  if (!toplevel_xact)
1893  {
1897  }
1898 
1899  if (pa_send_data(winfo, s->len, s->data))
1900  {
1901  /*
1902  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1903  * wait here for the parallel apply worker to finish as that
1904  * is not required to maintain the commit order and won't have
1905  * the risk of failures due to transaction dependencies and
1906  * deadlocks. However, it is possible that before the parallel
1907  * worker finishes and we clear the worker info, the xid
1908  * wraparound happens on the upstream and a new transaction
1909  * with the same xid can appear and that can lead to duplicate
1910  * entries in ParallelApplyTxnHash. Yet another problem could
1911  * be that we may have serialized the changes in partial
1912  * serialize mode and the file containing xact changes may
1913  * already exist, and after xid wraparound trying to create
1914  * the file for the same xid can lead to an error. To avoid
1915  * these problems, we decide to wait for the aborts to finish.
1916  *
1917  * Note, it is okay to not update the flush location position
1918  * for aborts as in worst case that means such a transaction
1919  * won't be sent again after restart.
1920  */
1921  if (toplevel_xact)
1923 
1924  break;
1925  }
1926 
1927  /*
1928  * Switch to serialize mode when we are not able to send the
1929  * change to parallel apply worker.
1930  */
1931  pa_switch_to_partial_serialize(winfo, true);
1932 
1933  /* fall through */
1935  Assert(winfo);
1936 
1937  /*
1938  * Parallel apply worker might have applied some changes, so write
1939  * the STREAM_ABORT message so that it can rollback the
1940  * subtransaction if needed.
1941  */
1943  &original_msg);
1944 
1945  if (toplevel_xact)
1946  {
1949  }
1950  break;
1951 
1952  case TRANS_PARALLEL_APPLY:
1953 
1954  /*
1955  * If the parallel apply worker is applying spooled messages then
1956  * close the file before aborting.
1957  */
1958  if (toplevel_xact && stream_fd)
1960 
1961  pa_stream_abort(&abort_data);
1962 
1963  /*
1964  * We need to wait after processing rollback to savepoint for the
1965  * next set of changes.
1966  *
1967  * We have a race condition here due to which we can start waiting
1968  * here when there are more chunk of streams in the queue. See
1969  * apply_handle_stream_stop.
1970  */
1971  if (!toplevel_xact)
1973 
1974  elog(DEBUG1, "finished processing the STREAM ABORT command");
1975  break;
1976 
1977  default:
1978  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1979  break;
1980  }
1981 
1983 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:381
static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
Definition: worker.c:5011
static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
Definition: worker.c:4266
static BufFile * stream_fd
Definition: worker.c:352
static void stream_abort_internal(TransactionId xid, TransactionId subxid)
Definition: worker.c:1749
static void stream_close_file(void)
Definition: worker.c:4218
uint32 TransactionId
Definition: c.h:636
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
#define AccessExclusiveLock
Definition: lockdefs.h:43
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
Definition: proto.c:1192
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
@ FS_SERIALIZE_DONE
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_ABORT, logicalrep_read_stream_abort(), MyLogicalRepWorker, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_fileset_state(), pa_stream_abort(), pa_switch_to_partial_serialize(), pa_unlock_stream(), pa_xact_finish(), LogicalRepWorker::parallel_apply, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, stream_abort_internal(), stream_close_file(), stream_fd, stream_open_and_write_change(), LogicalRepStreamAbortData::subxid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and LogicalRepStreamAbortData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 2153 of file worker.c.

2154 {
2155  TransactionId xid;
2156  LogicalRepCommitData commit_data;
2157  ParallelApplyWorkerInfo *winfo;
2158  TransApplyAction apply_action;
2159 
2160  /* Save the message before it is consumed. */
2161  StringInfoData original_msg = *s;
2162 
2164  ereport(ERROR,
2165  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2166  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2167 
2168  xid = logicalrep_read_stream_commit(s, &commit_data);
2169  set_apply_error_context_xact(xid, commit_data.commit_lsn);
2170 
2171  apply_action = get_transaction_apply_action(xid, &winfo);
2172 
2173  switch (apply_action)
2174  {
2175  case TRANS_LEADER_APPLY:
2176 
2177  /*
2178  * The transaction has been serialized to file, so replay all the
2179  * spooled operations.
2180  */
2182  commit_data.commit_lsn);
2183 
2184  apply_handle_commit_internal(&commit_data);
2185 
2186  /* Unlink the files with serialized changes and subxact info. */
2188 
2189  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2190  break;
2191 
2193  Assert(winfo);
2194 
2195  if (pa_send_data(winfo, s->len, s->data))
2196  {
2197  /* Finish processing the streaming transaction. */
2198  pa_xact_finish(winfo, commit_data.end_lsn);
2199  break;
2200  }
2201 
2202  /*
2203  * Switch to serialize mode when we are not able to send the
2204  * change to parallel apply worker.
2205  */
2206  pa_switch_to_partial_serialize(winfo, true);
2207 
2208  /* fall through */
2210  Assert(winfo);
2211 
2213  &original_msg);
2214 
2216 
2217  /* Finish processing the streaming transaction. */
2218  pa_xact_finish(winfo, commit_data.end_lsn);
2219  break;
2220 
2221  case TRANS_PARALLEL_APPLY:
2222 
2223  /*
2224  * If the parallel apply worker is applying spooled messages then
2225  * close the file before committing.
2226  */
2227  if (stream_fd)
2229 
2230  apply_handle_commit_internal(&commit_data);
2231 
2233 
2234  /*
2235  * It is important to set the transaction state as finished before
2236  * releasing the lock. See pa_wait_for_xact_finish.
2237  */
2240 
2242 
2243  elog(DEBUG1, "finished processing the STREAM COMMIT command");
2244  break;
2245 
2246  default:
2247  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2248  break;
2249  }
2250 
2251  /* Process any tables that are being synchronized in parallel. */
2252  process_syncing_tables(commit_data.end_lsn);
2253 
2255 
2257 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
ParallelApplyWorkerShared * MyParallelShared
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:4149
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2021
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1137
FileSet * stream_fileset
@ PARALLEL_TRANS_FINISHED

References AccessExclusiveLock, apply_handle_commit_internal(), apply_spooled_messages(), Assert(), LogicalRepCommitData::commit_lsn, StringInfoData::data, DEBUG1, elog(), LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_COMMIT, logicalrep_read_stream_commit(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, and XactLastCommitEnd.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1291 of file worker.c.

1292 {
1293  LogicalRepPreparedTxnData prepare_data;
1294  ParallelApplyWorkerInfo *winfo;
1295  TransApplyAction apply_action;
1296 
1297  /* Save the message before it is consumed. */
1298  StringInfoData original_msg = *s;
1299 
1301  ereport(ERROR,
1302  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1303  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1304 
1305  /* Tablesync should never receive prepare. */
1306  if (am_tablesync_worker())
1307  ereport(ERROR,
1308  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1309  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1310 
1311  logicalrep_read_stream_prepare(s, &prepare_data);
1312  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1313 
1314  apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1315 
1316  switch (apply_action)
1317  {
1318  case TRANS_LEADER_APPLY:
1319 
1320  /*
1321  * The transaction has been serialized to file, so replay all the
1322  * spooled operations.
1323  */
1325  prepare_data.xid, prepare_data.prepare_lsn);
1326 
1327  /* Mark the transaction as prepared. */
1328  apply_handle_prepare_internal(&prepare_data);
1329 
1331 
1333 
1334  in_remote_transaction = false;
1335 
1336  /* Unlink the files with serialized changes and subxact info. */
1338 
1339  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1340  break;
1341 
1343  Assert(winfo);
1344 
1345  if (pa_send_data(winfo, s->len, s->data))
1346  {
1347  /* Finish processing the streaming transaction. */
1348  pa_xact_finish(winfo, prepare_data.end_lsn);
1349  break;
1350  }
1351 
1352  /*
1353  * Switch to serialize mode when we are not able to send the
1354  * change to parallel apply worker.
1355  */
1356  pa_switch_to_partial_serialize(winfo, true);
1357 
1358  /* fall through */
1360  Assert(winfo);
1361 
1362  stream_open_and_write_change(prepare_data.xid,
1364  &original_msg);
1365 
1367 
1368  /* Finish processing the streaming transaction. */
1369  pa_xact_finish(winfo, prepare_data.end_lsn);
1370  break;
1371 
1372  case TRANS_PARALLEL_APPLY:
1373 
1374  /*
1375  * If the parallel apply worker is applying spooled messages then
1376  * close the file before preparing.
1377  */
1378  if (stream_fd)
1380 
1382 
1383  /* Mark the transaction as prepared. */
1384  apply_handle_prepare_internal(&prepare_data);
1385 
1387 
1389 
1391 
1394 
1396 
1397  elog(DEBUG1, "finished processing the STREAM PREPARE command");
1398  break;
1399 
1400  default:
1401  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1402  break;
1403  }
1404 
1405  pgstat_report_stat(false);
1406 
1407  /* Process any tables that are being synchronized in parallel. */
1408  process_syncing_tables(prepare_data.end_lsn);
1409 
1410  /*
1411  * Similar to prepare case, the subskiplsn could be left in a case of
1412  * server crash but it's okay. See the comments in apply_handle_prepare().
1413  */
1416 
1418 
1420 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References AccessExclusiveLock, am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), Assert(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), StringInfoData::data, DEBUG1, elog(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FS_SERIALIZE_DONE, get_transaction_apply_action(), in_remote_transaction, in_streamed_transaction, ParallelApplyWorkerShared::last_commit_end, StringInfoData::len, LOGICAL_REP_MSG_STREAM_PREPARE, logicalrep_read_stream_prepare(), MyLogicalRepWorker, MyParallelShared, pa_reset_subtrans(), pa_send_data(), pa_set_fileset_state(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_transaction(), pa_xact_finish(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), stream_close_file(), stream_fd, LogicalRepWorker::stream_fileset, stream_open_and_write_change(), LogicalRepWorker::subid, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_PARALLEL_APPLY, XactLastCommitEnd, LogicalRepPreparedTxnData::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1487 of file worker.c.

1488 {
1489  bool first_segment;
1490  ParallelApplyWorkerInfo *winfo;
1491  TransApplyAction apply_action;
1492 
1493  /* Save the message before it is consumed. */
1494  StringInfoData original_msg = *s;
1495 
1497  ereport(ERROR,
1498  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1499  errmsg_internal("duplicate STREAM START message")));
1500 
1501  /* There must not be an active streaming transaction. */
1503 
1504  /* notify handle methods we're processing a remote transaction */
1505  in_streamed_transaction = true;
1506 
1507  /* extract XID of the top-level transaction */
1508  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1509 
1511  ereport(ERROR,
1512  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1513  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1514 
1516 
1517  /* Try to allocate a worker for the streaming transaction. */
1518  if (first_segment)
1520 
1521  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1522 
1523  switch (apply_action)
1524  {
1526 
1527  /*
1528  * Function stream_start_internal starts a transaction. This
1529  * transaction will be committed on the stream stop unless it is a
1530  * tablesync worker in which case it will be committed after
1531  * processing all the messages. We need this transaction for
1532  * handling the BufFile, used for serializing the streaming data
1533  * and subxact info.
1534  */
1535  stream_start_internal(stream_xid, first_segment);
1536  break;
1537 
1539  Assert(winfo);
1540 
1541  /*
1542  * Once we start serializing the changes, the parallel apply
1543  * worker will wait for the leader to release the stream lock
1544  * until the end of the transaction. So, we don't need to release
1545  * the lock or increment the stream count in that case.
1546  */
1547  if (pa_send_data(winfo, s->len, s->data))
1548  {
1549  /*
1550  * Unlock the shared object lock so that the parallel apply
1551  * worker can continue to receive changes.
1552  */
1553  if (!first_segment)
1555 
1556  /*
1557  * Increment the number of streaming blocks waiting to be
1558  * processed by parallel apply worker.
1559  */
1561 
1562  /* Cache the parallel apply worker for this transaction. */
1564  break;
1565  }
1566 
1567  /*
1568  * Switch to serialize mode when we are not able to send the
1569  * change to parallel apply worker.
1570  */
1571  pa_switch_to_partial_serialize(winfo, !first_segment);
1572 
1573  /* fall through */
1575  Assert(winfo);
1576 
1577  /*
1578  * Open the spool file unless it was already opened when switching
1579  * to serialize mode. The transaction started in
1580  * stream_start_internal will be committed on the stream stop.
1581  */
1582  if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1583  stream_start_internal(stream_xid, first_segment);
1584 
1586 
1587  /* Cache the parallel apply worker for this transaction. */
1589  break;
1590 
1591  case TRANS_PARALLEL_APPLY:
1592  if (first_segment)
1593  {
1594  /* Hold the lock until the end of the transaction. */
1597 
1598  /*
1599  * Signal the leader apply worker, as it may be waiting for
1600  * us.
1601  */
1603  }
1604 
1606  break;
1607 
1608  default:
1609  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1610  break;
1611  }
1612 
1614 }
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_allocate_worker(TransactionId xid)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static uint32 parallel_stream_nchanges
Definition: worker.c:331
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:4236
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1449
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:643
#define InvalidOid
Definition: postgres_ext.h:36
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1087
@ PARALLEL_TRANS_STARTED

References AccessExclusiveLock, Assert(), StringInfoData::data, elog(), ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidOid, InvalidXLogRecPtr, StringInfoData::len, LOGICAL_REP_MSG_STREAM_START, logicalrep_read_stream_start(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MyParallelShared, pa_allocate_worker(), pa_lock_transaction(), pa_send_data(), pa_set_stream_apply_worker(), pa_set_xact_state(), pa_switch_to_partial_serialize(), pa_unlock_stream(), parallel_stream_nchanges, PARALLEL_TRANS_STARTED, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_add_fetch_u32(), pgstat_report_activity(), set_apply_error_context_xact(), ParallelApplyWorkerInfo::shared, STATE_RUNNING, stream_start_internal(), stream_write_change(), stream_xid, LogicalRepWorker::subid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, TransactionIdIsValid, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1646 of file worker.c.

1647 {
1648  ParallelApplyWorkerInfo *winfo;
1649  TransApplyAction apply_action;
1650 
1652  ereport(ERROR,
1653  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1654  errmsg_internal("STREAM STOP message without STREAM START")));
1655 
1656  apply_action = get_transaction_apply_action(stream_xid, &winfo);
1657 
1658  switch (apply_action)
1659  {
1662  break;
1663 
1665  Assert(winfo);
1666 
1667  /*
1668  * Lock before sending the STREAM_STOP message so that the leader
1669  * can hold the lock first and the parallel apply worker will wait
1670  * for leader to release the lock. See Locking Considerations atop
1671  * applyparallelworker.c.
1672  */
1674 
1675  if (pa_send_data(winfo, s->len, s->data))
1676  {
1678  break;
1679  }
1680 
1681  /*
1682  * Switch to serialize mode when we are not able to send the
1683  * change to parallel apply worker.
1684  */
1685  pa_switch_to_partial_serialize(winfo, true);
1686 
1687  /* fall through */
1692  break;
1693 
1694  case TRANS_PARALLEL_APPLY:
1695  elog(DEBUG1, "applied %u changes in the streaming chunk",
1697 
1698  /*
1699  * By the time parallel apply worker is processing the changes in
1700  * the current streaming block, the leader apply worker may have
1701  * sent multiple streaming blocks. This can lead to parallel apply
1702  * worker start waiting even when there are more chunk of streams
1703  * in the queue. So, try to lock only if there is no message left
1704  * in the queue. See Locking Considerations atop
1705  * applyparallelworker.c.
1706  *
1707  * Note that here we have a race condition where we can start
1708  * waiting even when there are pending streaming chunks. This can
1709  * happen if the leader sends another streaming block and acquires
1710  * the stream lock again after the parallel apply worker checks
1711  * that there is no pending streaming block and before it actually
1712  * starts waiting on a lock. We can handle this case by not
1713  * allowing the leader to increment the stream block count during
1714  * the time parallel apply worker acquires the lock but it is not
1715  * clear whether that is worth the complexity.
1716  *
1717  * Now, if this missed chunk contains rollback to savepoint, then
1718  * there is a risk of deadlock which probably shouldn't happen
1719  * after restart.
1720  */
1722  break;
1723 
1724  default:
1725  elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1726  break;
1727  }
1728 
1729  in_streamed_transaction = false;
1731 
1732  /*
1733  * The parallel apply worker could be in a transaction in which case we
1734  * need to report the state as STATE_IDLEINTRANSACTION.
1735  */
1738  else
1740 
1742 }
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1623
@ STATE_IDLEINTRANSACTION
#define InvalidTransactionId
Definition: transam.h:31
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4841

References AccessExclusiveLock, Assert(), StringInfoData::data, DEBUG1, elog(), ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), in_streamed_transaction, InvalidTransactionId, IsTransactionOrTransactionBlock(), StringInfoData::len, LOGICAL_REP_MSG_STREAM_STOP, pa_decr_and_wait_stream_block(), pa_lock_stream(), pa_send_data(), pa_set_stream_apply_worker(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pgstat_report_activity(), reset_apply_error_context_info(), ParallelApplyWorkerInfo::shared, STATE_IDLE, STATE_IDLEINTRANSACTION, stream_stop_internal(), stream_write_change(), stream_xid, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and ParallelApplyWorkerShared::xid.

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 3114 of file worker.c.

3115 {
3116  bool cascade = false;
3117  bool restart_seqs = false;
3118  List *remote_relids = NIL;
3119  List *remote_rels = NIL;
3120  List *rels = NIL;
3121  List *part_rels = NIL;
3122  List *relids = NIL;
3123  List *relids_logged = NIL;
3124  ListCell *lc;
3125  LOCKMODE lockmode = AccessExclusiveLock;
3126 
3127  /*
3128  * Quick return if we are skipping data modification changes or handling
3129  * streamed transactions.
3130  */
3131  if (is_skipping_changes() ||
3133  return;
3134 
3136 
3137  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3138 
3139  foreach(lc, remote_relids)
3140  {
3141  LogicalRepRelId relid = lfirst_oid(lc);
3142  LogicalRepRelMapEntry *rel;
3143 
3144  rel = logicalrep_rel_open(relid, lockmode);
3145  if (!should_apply_changes_for_rel(rel))
3146  {
3147  /*
3148  * The relation can't become interesting in the middle of the
3149  * transaction so it's safe to unlock it.
3150  */
3151  logicalrep_rel_close(rel, lockmode);
3152  continue;
3153  }
3154 
3155  remote_rels = lappend(remote_rels, rel);
3157  rels = lappend(rels, rel->localrel);
3158  relids = lappend_oid(relids, rel->localreloid);
3160  relids_logged = lappend_oid(relids_logged, rel->localreloid);
3161 
3162  /*
3163  * Truncate partitions if we got a message to truncate a partitioned
3164  * table.
3165  */
3166  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3167  {
3168  ListCell *child;
3169  List *children = find_all_inheritors(rel->localreloid,
3170  lockmode,
3171  NULL);
3172 
3173  foreach(child, children)
3174  {
3175  Oid childrelid = lfirst_oid(child);
3176  Relation childrel;
3177 
3178  if (list_member_oid(relids, childrelid))
3179  continue;
3180 
3181  /* find_all_inheritors already got lock */
3182  childrel = table_open(childrelid, NoLock);
3183 
3184  /*
3185  * Ignore temp tables of other backends. See similar code in
3186  * ExecuteTruncate().
3187  */
3188  if (RELATION_IS_OTHER_TEMP(childrel))
3189  {
3190  table_close(childrel, lockmode);
3191  continue;
3192  }
3193 
3195  rels = lappend(rels, childrel);
3196  part_rels = lappend(part_rels, childrel);
3197  relids = lappend_oid(relids, childrelid);
3198  /* Log this relation only if needed for logical decoding */
3199  if (RelationIsLogicallyLogged(childrel))
3200  relids_logged = lappend_oid(relids_logged, childrelid);
3201  }
3202  }
3203  }
3204 
3205  /*
3206  * Even if we used CASCADE on the upstream primary we explicitly default
3207  * to replaying changes without further cascading. This might be later
3208  * changeable with a user specified option.
3209  */
3210  ExecuteTruncateGuts(rels,
3211  relids,
3212  relids_logged,
3213  DROP_RESTRICT,
3214  restart_seqs);
3215  foreach(lc, remote_rels)
3216  {
3217  LogicalRepRelMapEntry *rel = lfirst(lc);
3218 
3220  }
3221  foreach(lc, part_rels)
3222  {
3223  Relation rel = lfirst(lc);
3224 
3225  table_close(rel, NoLock);
3226  }
3227 
3229 }
List * lappend(List *list, void *datum)
Definition: list.c:338
List * lappend_oid(List *list, Oid datum)
Definition: list.c:374
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
int LOCKMODE
Definition: lockdefs.h:26
@ DROP_RESTRICT
Definition: parsenodes.h:2048
#define ACL_TRUNCATE
Definition: parsenodes.h:87
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:618
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:658
Definition: pg_list.h:54
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1789

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2863 of file worker.c.

2867 {
2868  EState *estate = edata->estate;
2869  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2870  ResultRelInfo *relinfo = edata->targetRelInfo;
2871  Relation parentrel = relinfo->ri_RelationDesc;
2872  ModifyTableState *mtstate;
2873  PartitionTupleRouting *proute;
2874  ResultRelInfo *partrelinfo;
2875  Relation partrel;
2876  TupleTableSlot *remoteslot_part;
2877  TupleConversionMap *map;
2878  MemoryContext oldctx;
2879  LogicalRepRelMapEntry *part_entry = NULL;
2880  AttrMap *attrmap = NULL;
2881 
2882  /* ModifyTableState is needed for ExecFindPartition(). */
2883  edata->mtstate = mtstate = makeNode(ModifyTableState);
2884  mtstate->ps.plan = NULL;
2885  mtstate->ps.state = estate;
2886  mtstate->operation = operation;
2887  mtstate->resultRelInfo = relinfo;
2888 
2889  /* ... as is PartitionTupleRouting. */
2890  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2891 
2892  /*
2893  * Find the partition to which the "search tuple" belongs.
2894  */
2895  Assert(remoteslot != NULL);
2897  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2898  remoteslot, estate);
2899  Assert(partrelinfo != NULL);
2900  partrel = partrelinfo->ri_RelationDesc;
2901 
2902  /*
2903  * Check for supported relkind. We need this since partitions might be of
2904  * unsupported relkinds; and the set of partitions can change, so checking
2905  * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2906  */
2907  CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2909  RelationGetRelationName(partrel));
2910 
2911  /*
2912  * To perform any of the operations below, the tuple must match the
2913  * partition's rowtype. Convert if needed or just copy, using a dedicated
2914  * slot to store the tuple in any case.
2915  */
2916  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2917  if (remoteslot_part == NULL)
2918  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2919  map = ExecGetRootToChildMap(partrelinfo, estate);
2920  if (map != NULL)
2921  {
2922  attrmap = map->attrMap;
2923  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2924  remoteslot_part);
2925  }
2926  else
2927  {
2928  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2929  slot_getallattrs(remoteslot_part);
2930  }
2931  MemoryContextSwitchTo(oldctx);
2932 
2933  /* Check if we can do the update or delete on the leaf partition. */
2934  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2935  {
2936  part_entry = logicalrep_partition_open(relmapentry, partrel,
2937  attrmap);
2938  check_relation_updatable(part_entry);
2939  }
2940 
2941  switch (operation)
2942  {
2943  case CMD_INSERT:
2944  apply_handle_insert_internal(edata, partrelinfo,
2945  remoteslot_part);
2946  break;
2947 
2948  case CMD_DELETE:
2949  apply_handle_delete_internal(edata, partrelinfo,
2950  remoteslot_part,
2951  part_entry->localindexoid);
2952  break;
2953 
2954  case CMD_UPDATE:
2955 
2956  /*
2957  * For UPDATE, depending on whether or not the updated tuple
2958  * satisfies the partition's constraint, perform a simple UPDATE
2959  * of the partition or move the updated tuple into a different
2960  * suitable partition.
2961  */
2962  {
2963  TupleTableSlot *localslot;
2964  ResultRelInfo *partrelinfo_new;
2965  Relation partrel_new;
2966  bool found;
2967 
2968  /* Get the matching local tuple from the partition. */
2969  found = FindReplTupleInLocalRel(estate, partrel,
2970  &part_entry->remoterel,
2971  part_entry->localindexoid,
2972  remoteslot_part, &localslot);
2973  if (!found)
2974  {
2975  /*
2976  * The tuple to be updated could not be found. Do nothing
2977  * except for emitting a log message.
2978  *
2979  * XXX should this be promoted to ereport(LOG) perhaps?
2980  */
2981  elog(DEBUG1,
2982  "logical replication did not find row to be updated "
2983  "in replication target relation's partition \"%s\"",
2984  RelationGetRelationName(partrel));
2985  return;
2986  }
2987 
2988  /*
2989  * Apply the update to the local tuple, putting the result in
2990  * remoteslot_part.
2991  */
2993  slot_modify_data(remoteslot_part, localslot, part_entry,
2994  newtup);
2995  MemoryContextSwitchTo(oldctx);
2996 
2997  /*
2998  * Does the updated tuple still satisfy the current
2999  * partition's constraint?
3000  */
3001  if (!partrel->rd_rel->relispartition ||
3002  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3003  false))
3004  {
3005  /*
3006  * Yes, so simply UPDATE the partition. We don't call
3007  * apply_handle_update_internal() here, which would
3008  * normally do the following work, to avoid repeating some
3009  * work already done above to find the local tuple in the
3010  * partition.
3011  */
3012  EPQState epqstate;
3013 
3014  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
3015  ExecOpenIndices(partrelinfo, false);
3016 
3017  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3019  ACL_UPDATE);
3020  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3021  localslot, remoteslot_part);
3022  ExecCloseIndices(partrelinfo);
3023  EvalPlanQualEnd(&epqstate);
3024  }
3025  else
3026  {
3027  /* Move the tuple into the new partition. */
3028 
3029  /*
3030  * New partition will be found using tuple routing, which
3031  * can only occur via the parent table. We might need to
3032  * convert the tuple to the parent's rowtype. Note that
3033  * this is the tuple found in the partition, not the
3034  * original search tuple received by this function.
3035  */
3036  if (map)
3037  {
3038  TupleConversionMap *PartitionToRootMap =
3040  RelationGetDescr(parentrel));
3041 
3042  remoteslot =
3043  execute_attr_map_slot(PartitionToRootMap->attrMap,
3044  remoteslot_part, remoteslot);
3045  }
3046  else
3047  {
3048  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3049  slot_getallattrs(remoteslot);
3050  }
3051 
3052  /* Find the new partition. */
3054  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3055  proute, remoteslot,
3056  estate);
3057  MemoryContextSwitchTo(oldctx);
3058  Assert(partrelinfo_new != partrelinfo);
3059  partrel_new = partrelinfo_new->ri_RelationDesc;
3060 
3061  /* Check that new partition also has supported relkind. */
3062  CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3064  RelationGetRelationName(partrel_new));
3065 
3066  /* DELETE old tuple found in the old partition. */
3067  apply_handle_delete_internal(edata, partrelinfo,
3068  localslot,
3069  part_entry->localindexoid);
3070 
3071  /* INSERT new tuple into the new partition. */
3072 
3073  /*
3074  * Convert the replacement tuple to match the destination
3075  * partition rowtype.
3076  */
3078  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3079  if (remoteslot_part == NULL)
3080  remoteslot_part = table_slot_create(partrel_new,
3081  &estate->es_tupleTable);
3082  map = ExecGetRootToChildMap(partrelinfo_new, estate);
3083  if (map != NULL)
3084  {
3085  remoteslot_part = execute_attr_map_slot(map->attrMap,
3086  remoteslot,
3087  remoteslot_part);
3088  }
3089  else
3090  {
3091  remoteslot_part = ExecCopySlot(remoteslot_part,
3092  remoteslot);
3093  slot_getallattrs(remoteslot);
3094  }
3095  MemoryContextSwitchTo(oldctx);
3096  apply_handle_insert_internal(edata, partrelinfo_new,
3097  remoteslot_part);
3098  }
3099  }
3100  break;
3101 
3102  default:
3103  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3104  break;
3105  }
3106 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:918
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1779
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1263
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
@ CMD_UPDATE
Definition: nodes.h:277
#define makeNode(_type_)
Definition: nodes.h:176
#define ACL_UPDATE
Definition: parsenodes.h:85
#define RelationGetNamespace(relation)
Definition: rel.h:544
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:600
PartitionTupleRouting * proute
Definition: worker.c:231
ModifyTableState * mtstate
Definition: worker.c:230
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:662
CmdType operation
Definition: execnodes.h:1263
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1267
PlanState ps
Definition: execnodes.h:1262
Plan * plan
Definition: execnodes.h:1035
EState * state
Definition: execnodes.h:1037
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:575
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:521
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:400

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CheckSubscriptionRelkind(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog(), ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecGetRootToChildMap(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), get_namespace_name(), GetPerTupleMemoryContext, LogicalRepRelMapEntry::localindexoid, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 2346 of file worker.c.

2347 {
2348  LogicalRepTyp typ;
2349 
2351  return;
2352 
2353  logicalrep_read_typ(s, &typ);
2354 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:756

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 2529 of file worker.c.

2530 {
2531  LogicalRepRelMapEntry *rel;
2532  LogicalRepRelId relid;
2533  ApplyExecutionData *edata;
2534  EState *estate;
2535  LogicalRepTupleData oldtup;
2536  LogicalRepTupleData newtup;
2537  bool has_oldtup;
2538  TupleTableSlot *remoteslot;
2539  RTEPermissionInfo *target_perminfo;
2540  MemoryContext oldctx;
2541 
2542  /*
2543  * Quick return if we are skipping data modification changes or handling
2544  * streamed transactions.
2545  */
2546  if (is_skipping_changes() ||
2548  return;
2549 
2551 
2552  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2553  &newtup);
2554  rel = logicalrep_rel_open(relid, RowExclusiveLock);
2555  if (!should_apply_changes_for_rel(rel))
2556  {
2557  /*
2558  * The relation can't become interesting in the middle of the
2559  * transaction so it's safe to unlock it.
2560  */
2563  return;
2564  }
2565 
2566  /* Set relation for error callback */
2568 
2569  /* Check if we can do the update. */
2571 
2572  /* Initialize the executor state. */
2573  edata = create_edata_for_relation(rel);
2574  estate = edata->estate;
2575  remoteslot = ExecInitExtraTupleSlot(estate,
2576  RelationGetDescr(rel->localrel),
2577  &TTSOpsVirtual);
2578 
2579  /*
2580  * Populate updatedCols so that per-column triggers can fire, and so
2581  * executor can correctly pass down indexUnchanged hint. This could
2582  * include more columns than were actually changed on the publisher
2583  * because the logical replication protocol doesn't contain that
2584  * information. But it would for example exclude columns that only exist
2585  * on the subscriber, since we are not touching those.
2586  */
2587  target_perminfo = list_nth(estate->es_rteperminfos, 0);
2588  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2589  {
2591  int remoteattnum = rel->attrmap->attnums[i];
2592 
2593  if (!att->attisdropped && remoteattnum >= 0)
2594  {
2595  Assert(remoteattnum < newtup.ncols);
2596  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2597  target_perminfo->updatedCols =
2598  bms_add_member(target_perminfo->updatedCols,
2600  }
2601  }
2602 
2603  /* Build the search tuple. */
2605  slot_store_data(remoteslot, rel,
2606  has_oldtup ? &oldtup : &newtup);
2607  MemoryContextSwitchTo(oldctx);
2608 
2609  /* For a partitioned table, apply update to correct partition. */
2610  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2612  remoteslot, &newtup, CMD_UPDATE);
2613  else
2615  remoteslot, &newtup, rel->localindexoid);
2616 
2617  finish_edata(edata);
2618 
2619  /* Reset relation for error callback */
2621 
2623 
2625 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, Oid localindexoid)
Definition: worker.c:2633
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:755
int i
Definition: isn.c:73
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:97
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
AttrNumber * attnums
Definition: attmap.h:36
List * es_rteperminfos
Definition: execnodes.h:624
Bitmapset * updatedCols
Definition: parsenodes.h:1250
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_rteperminfos, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localindexoid, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RTEPermissionInfo::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
Oid  localindexoid 
)
static

Definition at line 2633 of file worker.c.

2638 {
2639  EState *estate = edata->estate;
2640  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2641  Relation localrel = relinfo->ri_RelationDesc;
2642  EPQState epqstate;
2643  TupleTableSlot *localslot;
2644  bool found;
2645  MemoryContext oldctx;
2646 
2647  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2648  ExecOpenIndices(relinfo, false);
2649 
2650  found = FindReplTupleInLocalRel(estate, localrel,
2651  &relmapentry->remoterel,
2652  localindexoid,
2653  remoteslot, &localslot);
2654  ExecClearTuple(remoteslot);
2655 
2656  /*
2657  * Tuple found.
2658  *
2659  * Note this will fail if there are other conflicting unique indexes.
2660  */
2661  if (found)
2662  {
2663  /* Process and store remote tuple in the slot */
2665  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2666  MemoryContextSwitchTo(oldctx);
2667 
2668  EvalPlanQualSetSlot(&epqstate, remoteslot);
2669 
2670  /* Do the actual update. */
2672  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2673  remoteslot);
2674  }
2675  else
2676  {
2677  /*
2678  * The tuple to be updated could not be found. Do nothing except for
2679  * emitting a log message.
2680  *
2681  * XXX should this be promoted to ereport(LOG) perhaps?
2682  */
2683  elog(DEBUG1,
2684  "logical replication did not find row to be updated "
2685  "in replication target relation \"%s\"",
2686  RelationGetRelationName(localrel));
2687  }
2688 
2689  /* Cleanup. */
2690  ExecCloseIndices(relinfo);
2691  EvalPlanQualEnd(&epqstate);
2692 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:471

References ACL_UPDATE, DEBUG1, elog(), ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2021 of file worker.c.

2023 {
2025  int nchanges;
2026  char path[MAXPGPATH];
2027  char *buffer = NULL;
2028  MemoryContext oldcxt;
2029  ResourceOwner oldowner;
2030  int fileno;
2031  off_t offset;
2032 
2033  if (!am_parallel_apply_worker())
2035 
2036  /* Make sure we have an open transaction */
2038 
2039  /*
2040  * Allocate file handle and memory required to process all the messages in
2041  * TopTransactionContext to avoid them getting reset after each message is
2042  * processed.
2043  */
2045 
2046  /* Open the spool file for the committed/prepared transaction */
2048  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2049 
2050  /*
2051  * Make sure the file is owned by the toplevel transaction so that the
2052  * file will not be accidentally closed when aborting a subtransaction.
2053  */
2054  oldowner = CurrentResourceOwner;
2056 
2057  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2058 
2059  CurrentResourceOwner = oldowner;
2060 
2061  buffer = palloc(BLCKSZ);
2062  initStringInfo(&s2);
2063 
2064  MemoryContextSwitchTo(oldcxt);
2065 
2066  remote_final_lsn = lsn;
2067 
2068  /*
2069  * Make sure the handle apply_dispatch methods are aware we're in a remote
2070  * transaction.
2071  */
2072  in_remote_transaction = true;
2074 
2076 
2077  /*
2078  * Read the entries one by one and pass them through the same logic as in
2079  * apply_dispatch.
2080  */
2081  nchanges = 0;
2082  while (true)
2083  {
2084  size_t nbytes;
2085  int len;
2086 
2088 
2089  /* read length of the on-disk record */
2090  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2091 
2092  /* have we reached end of the file? */
2093  if (nbytes == 0)
2094  break;
2095 
2096  /* do we have a correct length? */
2097  if (len <= 0)
2098  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2099  len, path);
2100 
2101  /* make sure we have sufficiently large buffer */
2102  buffer = repalloc(buffer, len);
2103 
2104  /* and finally read the data into the buffer */
2105  BufFileReadExact(stream_fd, buffer, len);
2106 
2107  BufFileTell(stream_fd, &fileno, &offset);
2108 
2109  /* copy the buffer to the stringinfo and call apply_dispatch */
2110  resetStringInfo(&s2);
2111  appendBinaryStringInfo(&s2, buffer, len);
2112 
2113  /* Ensure we are reading the data into our memory context. */
2115 
2116  apply_dispatch(&s2);
2117 
2119 
2120  MemoryContextSwitchTo(oldcxt);
2121 
2122  nchanges++;
2123 
2124  /*
2125  * It is possible the file has been closed because we have processed
2126  * the transaction end message like stream_commit in which case that
2127  * must be the last message.
2128  */
2129  if (!stream_fd)
2130  {
2131  ensure_last_message(stream_fileset, xid, fileno, offset);
2132  break;
2133  }
2134 
2135  if (nchanges % 1000 == 0)
2136  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2137  nchanges, path);
2138  }
2139 
2140  if (stream_fd)
2142 
2143  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2144  nchanges, path);
2145 
2146  return;
2147 }
MemoryContext ApplyMessageContext
Definition: worker.c:306
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4135
void apply_dispatch(StringInfo s)
Definition: worker.c:3236
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1989
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:651
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:830
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:661
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:314
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1456
void * palloc(Size size)
Definition: mcxt.c:1210
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:148
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), appendBinaryStringInfo(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog(), end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), resetStringInfo(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ apply_worker_exit()

static void apply_worker_exit ( void  )
static

Definition at line 3792 of file worker.c.

3793 {
3795  {
3796  /*
3797  * Don't stop the parallel apply worker as the leader will detect the
3798  * subscription parameter change and restart logical replication later
3799  * anyway. This also prevents the leader from reporting errors when
3800  * trying to communicate with a stopped parallel apply worker, which
3801  * would accidentally disable subscriptions if disable_on_error was
3802  * set.
3803  */
3804  return;
3805  }
3806 
3807  /*
3808  * Reset the last-start time for this apply worker so that the launcher
3809  * will restart it without waiting for wal_retrieve_retry_interval if the
3810  * subscription is still active, and so that we won't leak that hash table
3811  * entry if it isn't.
3812  */
3813  if (!am_tablesync_worker())
3815 
3816  proc_exit(0);
3817 }
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1031

References am_parallel_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), MyLogicalRepWorker, proc_exit(), and LogicalRepWorker::subid.

Referenced by InitializeApplyWorker(), and maybe_reread_subscription().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4473 of file worker.c.

4474 {
4475  int worker_slot = DatumGetInt32(main_arg);
4476  char originname[NAMEDATALEN];
4477  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4478  char *myslotname = NULL;
4480  int server_version;
4481 
4482  /* Attach to slot */
4483  logicalrep_worker_attach(worker_slot);
4484 
4485  /* Setup signal handling */
4487  pqsignal(SIGTERM, die);
4489 
4490  /*
4491  * We don't currently need any ResourceOwner in a walreceiver process, but
4492  * if we did, we could call CreateAuxProcessResourceOwner here.
4493  */
4494 
4495  /* Initialise stats to a sanish value */
4498 
4499  /* Load the libpq-specific functions */
4500  load_file("libpqwalreceiver", false);
4501 
4503 
4504  /* Connect to the origin and start the replication. */
4505  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4507 
4508  if (am_tablesync_worker())
4509  {
4510  start_table_sync(&origin_startpos, &myslotname);
4511 
4514  originname,
4515  sizeof(originname));
4516  set_apply_error_context_origin(originname);
4517  }
4518  else
4519  {
4520  /* This is the leader apply worker */
4521  RepOriginId originid;
4522  TimeLineID startpointTLI;
4523  char *err;
4524 
4525  myslotname = MySubscription->slotname;
4526 
4527  /*
4528  * This shouldn't happen if the subscription is enabled, but guard
4529  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
4530  * crash if slot is NULL.)
4531  */
4532  if (!myslotname)
4533  ereport(ERROR,
4534  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4535  errmsg("subscription has no replication slot set")));
4536 
4537  /* Setup replication origin tracking. */
4540  originname, sizeof(originname));
4541  originid = replorigin_by_name(originname, true);
4542  if (!OidIsValid(originid))
4543  originid = replorigin_create(originname);
4544  replorigin_session_setup(originid, 0);
4545  replorigin_session_origin = originid;
4546  origin_startpos = replorigin_session_get_progress(false);
4548 
4550  MySubscription->name, &err);
4551  if (LogRepWorkerWalRcvConn == NULL)
4552  ereport(ERROR,
4553  (errcode(ERRCODE_CONNECTION_FAILURE),
4554  errmsg("could not connect to the publisher: %s", err)));
4555 
4556  /*
4557  * We don't really use the output identify_system for anything but it
4558  * does some initializations on the upstream so let's still call it.
4559  */
4560  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4561 
4562  set_apply_error_context_origin(originname);
4563  }
4564 
4565  /*
4566  * Setup callback for syscache so that we know when something changes in
4567  * the subscription relation state.
4568  */
4571  (Datum) 0);
4572 
4573  /* Build logical replication streaming options. */
4574  options.logical = true;
4575  options.startpoint = origin_startpos;
4576  options.slotname = myslotname;
4577 
4579  options.proto.logical.proto_version =
4584 
4585  options.proto.logical.publication_names = MySubscription->publications;
4586  options.proto.logical.binary = MySubscription->binary;
4587 
4588  /*
4589  * Assign the appropriate option value for streaming option according to
4590  * the 'streaming' mode and the publisher's ability to support that mode.
4591  */
4592  if (server_version >= 160000 &&
4594  {
4595  options.proto.logical.streaming_str = "parallel";
4597  }
4598  else if (server_version >= 140000 &&
4600  {
4601  options.proto.logical.streaming_str = "on";
4603  }
4604  else
4605  {
4606  options.proto.logical.streaming_str = NULL;
4608  }
4609 
4610  options.proto.logical.twophase = false;
4611  options.proto.logical.origin = pstrdup(MySubscription->origin);
4612 
4613  if (!am_tablesync_worker())
4614  {
4615  /*
4616  * Even when the two_phase mode is requested by the user, it remains
4617  * as the tri-state PENDING until all tablesyncs have reached READY
4618  * state. Only then, can it become ENABLED.
4619  *
4620  * Note: If the subscription has no tables then leave the state as
4621  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4622  * work.
4623  */
4626  {
4627  /* Start streaming with two_phase enabled */
4628  options.proto.logical.twophase = true;
4630 
4635  }
4636  else
4637  {
4639  }
4640 
4641  ereport(DEBUG1,
4642  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4647  "?")));
4648  }
4649  else
4650  {
4651  /* Start normal logical streaming replication. */
4653  }
4654 
4655  /* Run the main loop. */
4656  start_apply(origin_startpos);
4657 
4658  proc_exit(0);
4659 }
void InitializeApplyWorker(void)
Definition: worker.c:4395
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:4319
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:457
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4362
void set_apply_error_context_origin(char *originname)
Definition: worker.c:4996
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:312
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
#define OidIsValid(objectId)
Definition: c.h:759
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void err(int eval, const char *fmt,...)
Definition: err.c:43
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void logicalrep_worker_attach(int slot)
Definition: launcher.c:674
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1624
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1234
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:110
static char ** options
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5660
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
bool AllTablesyncsReady(void)
Definition: tablesync.c:1573
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:272
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1598
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:422
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
#define walrcv_server_version(conn)
Definition: walreceiver.h:418
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:416
#define SIGHUP
Definition: win32_port.h:176
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References AllTablesyncsReady(), am_tablesync_worker(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32(), DEBUG1, die, elog(), ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, GetCurrentTimestamp(), InitializeApplyWorker(), invalidate_syncing_table_states(), InvalidOid, InvalidXLogRecPtr, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_PARALLEL, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::origin, LogicalRepWorker::parallel_apply, pqsignal(), proc_exit(), pstrdup(), Subscription::publications, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, set_apply_error_context_origin(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, start_apply(), start_table_sync(), StartTransactionCommand(), Subscription::stream, SUBSCRIPTIONRELMAP, Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 4964 of file worker.c.

4965 {
4966  if (isCommit && on_commit_wakeup_workers_subids != NIL)
4967  {
4968  ListCell *lc;
4969 
4970  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
4971  foreach(lc, on_commit_wakeup_workers_subids)
4972  {
4973  Oid subid = lfirst_oid(lc);
4974  List *workers;
4975  ListCell *lc2;
4976 
4977  workers = logicalrep_workers_find(subid, true);
4978  foreach(lc2, workers)
4979  {
4980  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
4981 
4983  }
4984  }
4985  LWLockRelease(LogicalRepWorkerLock);
4986  }
4987 
4988  /* The List storage will be reclaimed automatically in xact cleanup. */
4990 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:317
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:663
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:281
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:116

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 4135 of file worker.c.

4136 {
4137  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4138 }
#define snprintf
Definition: port.h:238

References MAXPGPATH, and snprintf.

Referenced by apply_spooled_messages(), ensure_last_message(), stream_abort_internal(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 2488 of file worker.c.

2489 {
2490  /*
2491  * For partitioned tables, we only need to care if the target partition is
2492  * updatable (aka has PK or RI defined for it).
2493  */
2494  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2495  return;
2496 
2497  /* Updatable, no error. */
2498  if (rel->updatable)
2499  return;
2500 
2501  /*
2502  * We are in error mode so it's fine this is somewhat slow. It's better to
2503  * give user correct error.
2504  */
2506  {
2507  ereport(ERROR,
2508  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2509  errmsg("publisher did not send replica identity column "
2510  "expected by the logical replication target relation \"%s.%s\"",
2511  rel->remoterel.nspname, rel->remoterel.relname)));
2512  }
2513 
2514  ereport(ERROR,
2515  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2516  errmsg("logical replication target relation \"%s.%s\" has "
2517  "neither REPLICA IDENTITY index nor PRIMARY "
2518  "KEY and published relation does not have "
2519  "REPLICA IDENTITY FULL",
2520  rel->remoterel.nspname, rel->remoterel.relname)));
2521 }
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:857

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

Definition at line 4281 of file worker.c.

4282 {
4283  if (subxact_data.subxacts)
4285 
4286  subxact_data.subxacts = NULL;
4288  subxact_data.nsubxacts = 0;
4290 }
static ApplySubXactData subxact_data
Definition: worker.c:370
void pfree(void *pointer)
Definition: mcxt.c:1436
uint32 nsubxacts
Definition: worker.c:364
uint32 nsubxacts_max
Definition: worker.c:365
SubXactInfo * subxacts
Definition: worker.c:367
TransactionId subxact_last
Definition: worker.c:366

References InvalidTransactionId, ApplySubXactData::nsubxacts, ApplySubXactData::nsubxacts_max, pfree(), subxact_data, ApplySubXactData::subxact_last, and ApplySubXactData::subxacts.

Referenced by stream_abort_internal(), and subxact_info_write().

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 4773 of file worker.c.

4774 {
4775  Relation rel;
4776  Form_pg_subscription subform;
4777  HeapTuple tup;
4778  XLogRecPtr myskiplsn = MySubscription->skiplsn;
4779  bool started_tx = false;
4780 
4782  return;
4783 
4784  if (!IsTransactionState())
4785  {
4787  started_tx = true;
4788  }
4789 
4790  /*
4791  * Protect subskiplsn of pg_subscription from being concurrently updated
4792  * while clearing it.
4793  */
4794  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4795  AccessShareLock);
4796 
4797  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4798 
4799  /* Fetch the existing tuple. */
4802 
4803  if (!HeapTupleIsValid(tup))
4804  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4805 
4806  subform = (Form_pg_subscription) GETSTRUCT(tup);
4807 
4808  /*
4809  * Clear the subskiplsn. If the user has already changed subskiplsn before
4810  * clearing it we don't update the catalog and the replication origin
4811  * state won't get advanced. So in the worst case, if the server crashes
4812  * before sending an acknowledgment of the flush position the transaction
4813  * will be sent again and the user needs to set subskiplsn again. We can
4814  * reduce the possibility by logging a replication origin WAL record to
4815  * advance the origin LSN instead but there is no way to advance the
4816  * origin timestamp and it doesn't seem to be worth doing anything about
4817  * it since it's a very rare case.
4818  */
4819  if (subform->subskiplsn == myskiplsn)
4820  {
4821  bool nulls[Natts_pg_subscription];
4822  bool replaces[Natts_pg_subscription];
4823  Datum values[Natts_pg_subscription];
4824 
4825  memset(values, 0, sizeof(values));
4826  memset(nulls, false, sizeof(nulls));
4827  memset(replaces, false, sizeof(replaces));
4828 
4829  /* reset subskiplsn */
4830  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4831  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4832 
4833  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4834  replaces);
4835  CatalogTupleUpdate(rel, &tup->t_self, tup);
4836 
4837  if (myskiplsn != finish_lsn)
4838  ereport(WARNING,
4839  errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4840  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4841  LSN_FORMAT_ARGS(finish_lsn),
4842  LSN_FORMAT_ARGS(myskiplsn)));
4843  }
4844 
4845  heap_freetuple(tup);
4846  table_close(rel, NoLock);
4847 
4848  if (started_tx)
4850 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define likely(x)
Definition: c.h:294
int errdetail(const char *fmt,...)
Definition: elog.c:1202
#define WARNING
Definition: elog.h:36
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessShareLock
Definition: lockdefs.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
FormData_pg_subscription * Form_pg_subscription
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References AccessShareLock, am_parallel_apply_worker(), CatalogTupleUpdate(), CommitTransactionCommand(), elog(), ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum(), MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 672 of file worker.c.

673 {
674  ApplyExecutionData *edata;
675  EState *estate;
676  RangeTblEntry *rte;
677  List *perminfos = NIL;
678  ResultRelInfo *resultRelInfo;
679 
680  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
681  edata->targetRel = rel;
682 
683  edata->estate = estate = CreateExecutorState();
684 
685  rte = makeNode(RangeTblEntry);
686  rte->rtekind = RTE_RELATION;
687  rte->relid = RelationGetRelid(rel->localrel);
688  rte->relkind = rel->localrel->rd_rel->relkind;
690 
691  addRTEPermissionInfo(&perminfos, rte);
692 
693  ExecInitRangeTable(estate, list_make1(rte), perminfos);
694 
695  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
696 
697  /*
698  * Use Relation opened by logicalrep_rel_open() instead of opening it
699  * again.
700  */
701  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
702 
703  /*
704  * We put the ResultRelInfo in the es_opened_result_relations list, even
705  * though we don't populate the es_result_relations array. That's a bit
706  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
707  *
708  * ExecOpenIndices() is not called here either, each execution path doing
709  * an apply operation being responsible for that.
710  */
712  lappend(estate->es_opened_result_relations, resultRelInfo);
713 
714  estate->es_output_cid = GetCurrentCommandId(true);
715 
716  /* Prepare to catch AFTER triggers. */
718 
719  /* other fields of edata remain NULL for now */
720 
721  return edata;
722 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1188
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:759
EState * CreateExecutorState(void)
Definition: execUtils.c:93
void * palloc0(Size size)
Definition: mcxt.c:1241
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1014
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:503
List * es_opened_result_relations
Definition: execnodes.h:638
CommandId es_output_cid
Definition: execnodes.h:632
RTEKind rtekind
Definition: parsenodes.h:1033
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4980
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818

References AccessShareLock, addRTEPermissionInfo(), AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, NIL, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

static void DisableSubscriptionAndExit ( void  )
static

Definition at line 4666 of file worker.c.

4667 {
4668  /*
4669  * Emit the error message, and recover from the error state to an idle
4670  * state
4671  */
4672  HOLD_INTERRUPTS();
4673 
4674  EmitErrorReport();
4676  FlushErrorState();
4677 
4679 
4680  /* Report the worker failed during either table synchronization or apply */
4682  !am_tablesync_worker());
4683 
4684  /* Disable the subscription */
4688 
4689  /* Ensure we remove no-longer-useful entry for worker's start time */
4692 
4693  /* Notify the subscription has been disabled and exit */
4694  ereport(LOG,
4695  errmsg("subscription \"%s\" has been disabled because of an error",
4696  MySubscription->name));
4697 
4698  proc_exit(0);
4699 }
void EmitErrorReport(void)
Definition: elog.c:1669
void FlushErrorState(void)
Definition: elog.c:1825
#define LOG
Definition: elog.h:31
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4719

References AbortOutOfAnyTransaction(), am_parallel_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ ensure_last_message()

static void ensure_last_message ( FileSet stream_fileset,
TransactionId  xid,
int  fileno,
off_t  offset 
)
static

Definition at line 1989 of file worker.c.

1991 {
1992  char path[MAXPGPATH];
1993  BufFile *fd;
1994  int last_fileno;
1995  off_t last_offset;
1996 
1998 
2000 
2002 
2003  fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2004 
2005  BufFileSeek(fd, 0, 0, SEEK_END);
2006  BufFileTell(fd, &last_fileno, &last_offset);
2007 
2008  BufFileClose(fd);
2009 
2011 
2012  if (last_fileno != fileno || last_offset != offset)
2013  elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2014  path);
2015 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:737
void BufFileClose(BufFile *file)
Definition: buffile.c:407
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References Assert(), begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileSeek(), BufFileTell(), changes_filename(), elog(), end_replication_step(), ERROR, fd(), IsTransactionState(), MAXPGPATH, MyLogicalRepWorker, and LogicalRepWorker::subid.

Referenced by apply_spooled_messages().

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( EState estate,
Relation  localrel,
LogicalRepRelation remoterel,
Oid  localidxoid,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2829 of file worker.c.

2834 {
2835  bool found;
2836 
2837  /*
2838  * Regardless of the top-level operation, we're performing a read here, so
2839  * check for SELECT privileges.
2840  */
2841  TargetPrivilegesCheck(localrel, ACL_SELECT);
2842 
2843  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2844 
2845  Assert(OidIsValid(localidxoid) ||
2846  (remoterel->replident == REPLICA_IDENTITY_FULL));
2847 
2848  if (OidIsValid(localidxoid))
2849  found = RelationFindReplTupleByIndex(localrel, localidxoid,
2851  remoteslot, *localslot);
2852  else
2853  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2854  remoteslot, *localslot);
2855 
2856  return found;
2857 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:84

References ACL_SELECT, Assert(), EState::es_tupleTable, LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), and TargetPrivilegesCheck().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 729 of file worker.c.

730 {
731  EState *estate = edata->estate;
732 
733  /* Handle any queued AFTER triggers. */
734  AfterTriggerEndQuery(estate);
735 
736  /* Shut down tuple routing, if any was done. */
737  if (edata->proute)
738  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
739 
740  /*
741  * Cleanup. It might seem that we should call ExecCloseResultRelations()
742  * here, but we intentionally don't. It would close the rel we added to
743  * es_opened_result_relations above, which is wrong because we took no
744  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
745  * any other relations opened during execution.
746  */
747  ExecResetTupleTable(estate->es_tupleTable, false);
748  FreeExecutorState(estate);
749  pfree(edata);
750 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
void FreeExecutorState(EState *estate)
Definition: execUtils.c:194
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5000

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 3356 of file worker.c.

3358 {
3359  dlist_mutable_iter iter;
3360  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3361 
3363  *flush = InvalidXLogRecPtr;
3364 
3366  {
3367  FlushPosition *pos =
3368  dlist_container(FlushPosition, node, iter.cur);
3369 
3370  *write = pos->remote_end;
3371 
3372  if (pos->local_end <= local_flush)
3373  {
3374  *flush = pos->remote_end;
3375  dlist_delete(iter.cur);
3376  pfree(pos);
3377  }
3378  else
3379  {
3380  /*
3381  * Don't want to uselessly iterate over the rest of the list which
3382  * could potentially be long. Instead get the last element and
3383  * grab the write position from there.
3384  */
3385  pos = dlist_tail_element(FlushPosition, node,
3386  &lsn_mapping);
3387  *write = pos->remote_end;
3388  *have_pending_txes = true;
3389  return;
3390  }
3391  }
3392 
3393  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3394 }
static dlist_head lsn_mapping
Definition: worker.c:220
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:612
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:217
XLogRecPtr local_end
Definition: worker.c:216
dlist_node * cur
Definition: ilist.h:200
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6073

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ get_transaction_apply_action()

static TransApplyAction get_transaction_apply_action ( TransactionId  xid,
ParallelApplyWorkerInfo **  winfo 
)
static

Definition at line 5011 of file worker.c.

5012 {
5013  *winfo = NULL;
5014 
5016  {
5017  return TRANS_PARALLEL_APPLY;
5018  }
5019 
5020  /*
5021  * If we are processing this transaction using a parallel apply worker then
5022  * either we send the changes to the parallel worker or if the worker is busy
5023  * then serialize the changes to the file which will later be processed by
5024  * the parallel worker.
5025  */
5026  *winfo = pa_find_worker(xid);
5027 
5028  if (*winfo && (*winfo)->serialize_changes)
5029  {
5031  }
5032  else if (*winfo)
5033  {
5035  }
5036 
5037  /*
5038  * If there is no parallel worker involved to process this transaction then
5039  * we either directly apply the change or serialize it to a file which will
5040  * later be applied when the transaction finish message is processed.
5041  */
5042  else if (in_streamed_transaction)
5043  {
5044  return TRANS_LEADER_SERIALIZE;
5045  }
5046  else
5047  {
5048  return TRANS_LEADER_APPLY;
5049  }
5050 }
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

References am_parallel_apply_worker(), in_streamed_transaction, pa_find_worker(), ParallelApplyWorkerInfo::serialize_changes, TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, and TRANS_PARALLEL_APPLY.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ get_worker_name()

static const char* get_worker_name ( void  )
static

Definition at line 438 of file worker.c.

439 {
440  if (am_tablesync_worker())
441  return _("logical replication table synchronization worker");
442  else if (am_parallel_apply_worker())
443  return _("logical replication parallel apply worker");
444  else
445  return _("logical replication apply worker");
446 }
#define _(x)
Definition: elog.c:91

References _, am_parallel_apply_worker(), and am_tablesync_worker().

Referenced by InitializeApplyWorker(), and maybe_reread_subscription().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 579 of file worker.c.

580 {
581  TransactionId current_xid;
583  TransApplyAction apply_action;
584  StringInfoData original_msg;
585 
586  apply_action = get_transaction_apply_action(stream_xid, &winfo);
587 
588  /* not in streaming mode */
589  if (apply_action == TRANS_LEADER_APPLY)
590  return false;
591 
593 
594  /*
595  * The parallel apply worker needs the xid in this message to decide
596  * whether to define a savepoint, so save the original message that has
597  * not moved the cursor after the xid. We will serialize this message to a
598  * file in PARTIAL_SERIALIZE mode.
599  */
600  original_msg = *s;
601 
602  /*
603  * We should have received XID of the subxact as the first part of the
604  * message, so extract it.
605  */
606  current_xid = pq_getmsgint(s, 4);
607 
608  if (!TransactionIdIsValid(current_xid))
609  ereport(ERROR,
610  (errcode(ERRCODE_PROTOCOL_VIOLATION),
611  errmsg_internal("invalid transaction ID in streamed replication transaction")));
612 
613  switch (apply_action)
614  {
616  Assert(stream_fd);
617 
618  /* Add the new subxact to the array (unless already there). */
619  subxact_info_add(current_xid);
620 
621  /* Write the change to the current file */
623  return true;
624 
626  Assert(winfo);
627 
628  /*
629  * XXX The publisher side doesn't always send relation/type update
630  * messages after the streaming transaction, so also update the
631  * relation/type in leader apply worker. See function
632  * cleanup_rel_sync_cache.
633  */
634  if (pa_send_data(winfo, s->len, s->data))
635  return (action != LOGICAL_REP_MSG_RELATION &&
637 
638  /*
639  * Switch to serialize mode when we are not able to send the
640  * change to parallel apply worker.
641  */
642  pa_switch_to_partial_serialize(winfo, false);
643 
644  /* fall through */
646  stream_write_change(action, &original_msg);
647 
648  /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
649  return (action != LOGICAL_REP_MSG_RELATION &&
651 
654 
655  /* Define a savepoint for a subxact if needed. */
656  pa_start_subtrans(current_xid, stream_xid);
657  return false;
658 
659  default:
660  Assert(false);
661  return false; /* silence compiler warning */
662  }
663 }
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
static void subxact_info_add(TransactionId xid)
Definition: worker.c:4050
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418

References generate_unaccent_rules::action, Assert(), StringInfoData::data, ereport, errcode(), errmsg_internal(), ERROR, get_transaction_apply_action(), StringInfoData::len, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_TYPE, pa_send_data(), pa_start_subtrans(), pa_switch_to_partial_serialize(), parallel_stream_nchanges, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), TRANS_LEADER_APPLY, TRANS_LEADER_PARTIAL_SERIALIZE, TRANS_LEADER_SEND_TO_PARALLEL, TRANS_LEADER_SERIALIZE, TRANS_PARALLEL_APPLY, and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ InitializeApplyWorker()

void InitializeApplyWorker ( void  )

Definition at line 4395 of file worker.c.

4396 {
4397  MemoryContext oldctx;
4398 
4399  /* Run as replica session replication role. */
4400  SetConfigOption("session_replication_role", "replica",
4402 
4403  /* Connect to our database. */
4406  0);
4407 
4408  /*
4409  * Set always-secure search path, so malicious users can't redirect user
4410  * code (e.g. pg_index.indexprs).
4411  */
4412  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4413 
4414  /* Load the subscription into persistent memory context. */
4416  "ApplyContext",
4420 
4422  if (!MySubscription)
4423  {
4424  ereport(LOG,
4425  /* translator: %s is the name of logical replication worker */
4426  (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
4428 
4429  /* Ensure we remove no-longer-useful entry for worker's start time */
4432  proc_exit(0);
4433  }
4434 
4435  MySubscriptionValid = true;
4436  MemoryContextSwitchTo(oldctx);
4437 
4438  if (!MySubscription->enabled)
4439  {
4440  ereport(LOG,
4441  /* translator: first %s is the name of logical replication worker */
4442  (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
4444 
4446  }
4447 
4448  /* Setup synchronous commit according to the user's wishes */
4449  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4451 
4452  /* Keep us informed about subscription changes. */
4455  (Datum) 0);
4456 
4457  if (am_tablesync_worker())
4458  ereport(LOG,
4459  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4462  else
4463  ereport(LOG,
4464  /* translator: first %s is the name of logical replication worker */
4465  (errmsg("%s for subscription \"%s\" has started",
4467 
4469 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3935
static void apply_worker_exit(void)
Definition: worker.c:3792
MemoryContext ApplyContext
Definition: worker.c:307
static bool MySubscriptionValid
Definition: worker.c:315
static const char * get_worker_name(void)
Definition: worker.c:438
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4176
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
Subscription * GetSubscription(Oid subid, bool missing_ok)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5627

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_parallel_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), get_worker_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4714 of file worker.c.

4715 {
4717 }
bool IsLogicalWorker(void)
Definition: worker.c:4705

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4705 of file worker.c.

4706 {
4707  return MyLogicalRepWorker != NULL;
4708 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 3442 of file worker.c.

3443 {
3444  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3445  bool ping_sent = false;
3446  TimeLineID tli;
3447  ErrorContextCallback errcallback;
3448 
3449  /*
3450  * Init the ApplyMessageContext which we clean up after each replication
3451  * protocol message.
3452  */
3454  "ApplyMessageContext",
3456 
3457  /*
3458  * This memory context is used for per-stream data when the streaming mode
3459  * is enabled. This context is reset on each stream stop.
3460  */
3462  "LogicalStreamingContext",
3464 
3465  /* mark as idle, before starting to loop */
3467 
3468  /*
3469  * Push apply error context callback. Fields will be filled while applying
3470  * a change.
3471  */
3472  errcallback.callback = apply_error_callback;
3473  errcallback.previous = error_context_stack;
3474  error_context_stack = &errcallback;
3476 
3477  /* This outer loop iterates once per wait. */
3478  for (;;)
3479  {
3481  int rc;
3482  int len;
3483  char *buf = NULL;
3484  bool endofstream = false;
3485  long wait_time;
3486 
3488 
3490 
3492 
3493  if (len != 0)
3494  {
3495  /* Loop to process all available data (without blocking). */
3496  for (;;)
3497  {
3499 
3500  if (len == 0)
3501  {
3502  break;
3503  }
3504  else if (len < 0)
3505  {
3506  ereport(LOG,
3507  (errmsg("data stream from publisher has ended")));
3508  endofstream = true;
3509  break;
3510  }
3511  else
3512  {
3513  int c;
3514  StringInfoData s;
3515 
3516  /* Reset timeout. */
3517  last_recv_timestamp = GetCurrentTimestamp();
3518  ping_sent = false;
3519 
3520  /* Ensure we are reading the data into our memory context. */
3522 
3523  s.data = buf;
3524  s.len = len;
3525  s.cursor = 0;
3526  s.maxlen = -1;
3527 
3528  c = pq_getmsgbyte(&s);
3529 
3530  if (c == 'w')
3531  {
3532  XLogRecPtr start_lsn;
3533  XLogRecPtr end_lsn;
3534  TimestampTz send_time;
3535 
3536  start_lsn = pq_getmsgint64(&s);
3537  end_lsn = pq_getmsgint64(&s);
3538  send_time = pq_getmsgint64(&s);
3539 
3540  if (last_received < start_lsn)
3541  last_received = start_lsn;
3542 
3543  if (last_received < end_lsn)
3544  last_received = end_lsn;
3545 
3546  UpdateWorkerStats(last_received, send_time, false);
3547 
3548  apply_dispatch(&s);
3549  }
3550  else if (c == 'k')
3551  {
3552  XLogRecPtr end_lsn;
3554  bool reply_requested;
3555 
3556  end_lsn = pq_getmsgint64(&s);
3557  timestamp = pq_getmsgint64(&s);
3558  reply_requested = pq_getmsgbyte(&s);
3559 
3560  if (last_received < end_lsn)
3561  last_received = end_lsn;
3562 
3563  send_feedback(last_received, reply_requested, false);
3564  UpdateWorkerStats(last_received, timestamp, true);
3565  }
3566  /* other message types are purposefully ignored */
3567 
3569  }
3570 
3572  }
3573  }
3574 
3575  /* confirm all writes so far */
3576  send_feedback(last_received, false, false);
3577 
3579  {
3580  /*
3581  * If we didn't get any transactions for a while there might be
3582  * unconsumed invalidation messages in the queue, consume them
3583  * now.
3584  */
3587 
3588  /* Process any table synchronization changes. */
3589  process_syncing_tables(last_received);
3590  }
3591 
3592  /* Cleanup the memory. */
3595 
3596  /* Check if we need to exit the streaming loop. */
3597  if (endofstream)
3598  break;
3599 
3600  /*
3601  * Wait for more data or latch. If we have unflushed transactions,
3602  * wake up after WalWriterDelay to see if they've been flushed yet (in
3603  * which case we should send a feedback message). Otherwise, there's
3604  * no particular urgency about waking up unless we get data or a
3605  * signal.
3606  */
3607  if (!dlist_is_empty(&lsn_mapping))
3608  wait_time = WalWriterDelay;
3609  else
3610  wait_time = NAPTIME_PER_CYCLE;
3611 
3615  fd, wait_time,
3617 
3618  if (rc & WL_LATCH_SET)
3619  {
3622  }
3623 
3624  if (ConfigReloadPending)
3625  {
3626  ConfigReloadPending = false;
3628  }
3629 
3630  if (rc & WL_TIMEOUT)
3631  {
3632  /*
3633  * We didn't receive anything new. If we haven't heard anything
3634  * from the server for more than wal_receiver_timeout / 2, ping
3635  * the server. Also, if it's been longer than
3636  * wal_receiver_status_interval since the last update we sent,
3637  * send a status update to the primary anyway, to report any
3638  * progress in applying WAL.
3639  */
3640  bool requestReply = false;
3641 
3642  /*
3643  * Check if time since last receive from primary has reached the
3644  * configured limit.
3645  */
3646  if (wal_receiver_timeout > 0)
3647  {
3649  TimestampTz timeout;
3650 
3651  timeout =
3652  TimestampTzPlusMilliseconds(last_recv_timestamp,
3654 
3655  if (now >= timeout)
3656  ereport(ERROR,
3657  (errcode(ERRCODE_CONNECTION_FAILURE),
3658  errmsg("terminating logical replication worker due to timeout")));
3659 
3660  /* Check to see if it's time for a ping. */
3661  if (!ping_sent)
3662  {
3663  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3664  (wal_receiver_timeout / 2));
3665  if (now >= timeout)
3666  {
3667  requestReply = true;
3668  ping_sent = true;
3669  }
3670  }
3671  }
3672 
3673  send_feedback(last_received, requestReply, requestReply);
3674 
3675  /*
3676  * Force reporting to ensure long idle periods don't lead to
3677  * arbitrarily delayed stats. Stats can only be reported outside
3678  * of (implicit or explicit) transactions. That shouldn't lead to
3679  * stats being delayed for long, because transactions are either
3680  * sent as a whole on commit or streamed. Streamed transactions
3681  * are spilled to disk and applied on commit.
3682  */
3683  if (!IsTransactionState())
3684  pgstat_report_stat(true);
3685  }
3686  }
3687 
3688  /* Pop the error context stack */
3689  error_context_stack = errcallback.previous;
3691 
3692  /* All done */
3694 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:3426
#define NAPTIME_PER_CYCLE
Definition: worker.c:211
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:304
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:3703
void apply_error_callback(void *arg)
Definition: worker.c:4854
static MemoryContext LogicalStreamingContext
Definition: worker.c:310
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1546
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:95
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:540
void ResetLatch(Latch *latch)
Definition: latch.c:699
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:70
static char * buf
Definition: pg_test_fsync.c:67
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
char * c
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
Definition: wait_event.h:43
int wal_receiver_timeout
Definition: walreceiver.c:91
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:424
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:426
int WalWriterDelay
Definition: walwriter.c:70

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), apply_error_context_stack, ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, IsTransactionState(), StringInfoData::len, len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3823 of file worker.c.

3824 {
3825  MemoryContext oldctx;
3827  bool started_tx = false;
3828 
3829  /* When cache state is valid there is nothing to do here. */
3830  if (MySubscriptionValid)
3831  return;
3832 
3833  /* This function might be called inside or outside of transaction. */
3834  if (!IsTransactionState())
3835  {
3837  started_tx = true;
3838  }
3839 
3840  /* Ensure allocations in permanent context. */
3842