PostgreSQL Source Code  git master
worker.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeModifyTable.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "postmaster/walwriter.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
#include "utils/timeout.h"
Include dependency graph for worker.c:

Go to the source code of this file.

Data Structures

struct  FlushPosition
 
struct  ApplyExecutionData
 
struct  ApplyErrorCallbackArg
 
struct  SubXactInfo
 
struct  ApplySubXactData
 

Macros

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */
 
#define is_skipping_changes()   (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 

Typedefs

typedef struct FlushPosition FlushPosition
 
typedef struct ApplyExecutionData ApplyExecutionData
 
typedef struct ApplyErrorCallbackArg ApplyErrorCallbackArg
 
typedef struct SubXactInfo SubXactInfo
 
typedef struct ApplySubXactData ApplySubXactData
 

Functions

static void subxact_filename (char *path, Oid subid, TransactionId xid)
 
static void changes_filename (char *path, Oid subid, TransactionId xid)
 
static void subxact_info_write (Oid subid, TransactionId xid)
 
static void subxact_info_read (Oid subid, TransactionId xid)
 
static void subxact_info_add (TransactionId xid)
 
static void cleanup_subxact_info (void)
 
static void stream_cleanup_files (Oid subid, TransactionId xid)
 
static void stream_open_file (Oid subid, TransactionId xid, bool first)
 
static void stream_write_change (char action, StringInfo s)
 
static void stream_close_file (void)
 
static void send_feedback (XLogRecPtr recvpos, bool force, bool requestReply)
 
static void store_flush_position (XLogRecPtr remote_lsn)
 
static void maybe_reread_subscription (void)
 
static void DisableSubscriptionAndExit (void)
 
static void apply_dispatch (StringInfo s)
 
static void apply_handle_commit_internal (LogicalRepCommitData *commit_data)
 
static void apply_handle_insert_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static void apply_handle_update_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
 
static void apply_handle_delete_internal (ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
 
static bool FindReplTupleInLocalRel (EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
 
static void apply_handle_tuple_routing (ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
 
static void TwoPhaseTransactionGid (Oid subid, TransactionId xid, char *gid, int szgid)
 
static void apply_spooled_messages (TransactionId xid, XLogRecPtr lsn)
 
static void maybe_start_skipping_changes (XLogRecPtr finish_lsn)
 
static void stop_skipping_changes (void)
 
static void clear_subscription_skip_lsn (XLogRecPtr finish_lsn)
 
static void apply_error_callback (void *arg)
 
static void set_apply_error_context_xact (TransactionId xid, XLogRecPtr lsn)
 
static void reset_apply_error_context_info (void)
 
static bool should_apply_changes_for_rel (LogicalRepRelMapEntry *rel)
 
static void begin_replication_step (void)
 
static void end_replication_step (void)
 
static bool handle_streamed_transaction (LogicalRepMsgType action, StringInfo s)
 
static ApplyExecutionDatacreate_edata_for_relation (LogicalRepRelMapEntry *rel)
 
static void finish_edata (ApplyExecutionData *edata)
 
static void slot_fill_defaults (LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
 
static void slot_store_data (TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void slot_modify_data (TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
 
static void apply_handle_begin (StringInfo s)
 
static void apply_handle_commit (StringInfo s)
 
static void apply_handle_begin_prepare (StringInfo s)
 
static void apply_handle_prepare_internal (LogicalRepPreparedTxnData *prepare_data)
 
static void apply_handle_prepare (StringInfo s)
 
static void apply_handle_commit_prepared (StringInfo s)
 
static void apply_handle_rollback_prepared (StringInfo s)
 
static void apply_handle_stream_prepare (StringInfo s)
 
static void apply_handle_origin (StringInfo s)
 
static void apply_handle_stream_start (StringInfo s)
 
static void apply_handle_stream_stop (StringInfo s)
 
static void apply_handle_stream_abort (StringInfo s)
 
static void apply_handle_stream_commit (StringInfo s)
 
static void apply_handle_relation (StringInfo s)
 
static void apply_handle_type (StringInfo s)
 
static Oid GetRelationIdentityOrPK (Relation rel)
 
static void TargetPrivilegesCheck (Relation rel, AclMode mode)
 
static void apply_handle_insert (StringInfo s)
 
static void check_relation_updatable (LogicalRepRelMapEntry *rel)
 
static void apply_handle_update (StringInfo s)
 
static void apply_handle_delete (StringInfo s)
 
static void apply_handle_truncate (StringInfo s)
 
static void get_flush_position (XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
 
static void UpdateWorkerStats (XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 
static void LogicalRepApplyLoop (XLogRecPtr last_received)
 
static void subscription_change_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **myslotname)
 
static void start_apply (XLogRecPtr origin_startpos)
 
void ApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 

Variables

static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping)
 
static ApplyErrorCallbackArg apply_error_callback_arg
 
static MemoryContext ApplyMessageContext = NULL
 
MemoryContext ApplyContext = NULL
 
static MemoryContext LogicalStreamingContext = NULL
 
WalReceiverConnLogRepWorkerWalRcvConn = NULL
 
SubscriptionMySubscription = NULL
 
static bool MySubscriptionValid = false
 
bool in_remote_transaction = false
 
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr
 
static bool in_streamed_transaction = false
 
static TransactionId stream_xid = InvalidTransactionId
 
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr
 
static BufFilestream_fd = NULL
 
static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}
 

Macro Definition Documentation

◆ is_skipping_changes

#define is_skipping_changes ( )    (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))

Definition at line 277 of file worker.c.

◆ NAPTIME_PER_CYCLE

#define NAPTIME_PER_CYCLE   1000 /* max sleep time between cycles (1s) */

Definition at line 199 of file worker.c.

Typedef Documentation

◆ ApplyErrorCallbackArg

◆ ApplyExecutionData

◆ ApplySubXactData

◆ FlushPosition

typedef struct FlushPosition FlushPosition

◆ SubXactInfo

typedef struct SubXactInfo SubXactInfo

Function Documentation

◆ apply_dispatch()

static void apply_dispatch ( StringInfo  s)
static

Definition at line 2471 of file worker.c.

2472 {
2474  LogicalRepMsgType saved_command;
2475 
2476  /*
2477  * Set the current command being applied. Since this function can be
2478  * called recursively when applying spooled changes, save the current
2479  * command.
2480  */
2481  saved_command = apply_error_callback_arg.command;
2483 
2484  switch (action)
2485  {
2486  case LOGICAL_REP_MSG_BEGIN:
2487  apply_handle_begin(s);
2488  break;
2489 
2492  break;
2493 
2496  break;
2497 
2500  break;
2501 
2504  break;
2505 
2508  break;
2509 
2512  break;
2513 
2514  case LOGICAL_REP_MSG_TYPE:
2515  apply_handle_type(s);
2516  break;
2517 
2520  break;
2521 
2523 
2524  /*
2525  * Logical replication does not use generic logical messages yet.
2526  * Although, it could be used by other applications that use this
2527  * output plugin.
2528  */
2529  break;
2530 
2533  break;
2534 
2537  break;
2538 
2541  break;
2542 
2545  break;
2546 
2549  break;
2550 
2553  break;
2554 
2557  break;
2558 
2561  break;
2562 
2565  break;
2566 
2567  default:
2568  ereport(ERROR,
2569  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2570  errmsg("invalid logical replication message type \"%c\"", action)));
2571  }
2572 
2573  /* Reset the current command */
2574  apply_error_callback_arg.command = saved_command;
2575 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1073
static void apply_handle_type(StringInfo s)
Definition: worker.c:1579
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:2349
static void apply_handle_update(StringInfo s)
Definition: worker.c:1780
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:1469
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:973
static ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:235
static void apply_handle_delete(StringInfo s)
Definition: worker.c:1952
static void apply_handle_begin(StringInfo s)
Definition: worker.c:810
static void apply_handle_commit(StringInfo s)
Definition: worker.c:832
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1249
static void apply_handle_relation(StringInfo s)
Definition: worker.c:1556
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:918
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1016
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1216
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1131
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:858
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1149
static void apply_handle_insert(StringInfo s)
Definition: worker.c:1644
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
LogicalRepMsgType
Definition: logicalproto.h:53
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:57
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:54
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:55
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:63
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:56
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:58
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
LogicalRepMsgType command
Definition: worker.c:225

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), and LogicalRepApplyLoop().

◆ apply_error_callback()

static void apply_error_callback ( void *  arg)
static

Definition at line 3990 of file worker.c.

3991 {
3993 
3995  return;
3996 
3997  Assert(errarg->origin_name);
3998 
3999  if (errarg->rel == NULL)
4000  {
4001  if (!TransactionIdIsValid(errarg->remote_xid))
4002  errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
4003  errarg->origin_name,
4004  logicalrep_message_type(errarg->command));
4005  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4006  errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
4007  errarg->origin_name,
4009  errarg->remote_xid);
4010  else
4011  errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
4012  errarg->origin_name,
4014  errarg->remote_xid,
4015  LSN_FORMAT_ARGS(errarg->finish_lsn));
4016  }
4017  else if (errarg->remote_attnum < 0)
4018  errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
4019  errarg->origin_name,
4021  errarg->rel->remoterel.nspname,
4022  errarg->rel->remoterel.relname,
4023  errarg->remote_xid,
4024  LSN_FORMAT_ARGS(errarg->finish_lsn));
4025  else
4026  errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
4027  errarg->origin_name,
4029  errarg->rel->remoterel.nspname,
4030  errarg->rel->remoterel.relname,
4031  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4032  errarg->remote_xid,
4033  LSN_FORMAT_ARGS(errarg->finish_lsn));
4034 }
#define errcontext
Definition: elog.h:190
Assert(fmt[strlen(fmt) - 1] !='\n')
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1197
TransactionId remote_xid
Definition: worker.c:230
XLogRecPtr finish_lsn
Definition: worker.c:231
LogicalRepRelMapEntry * rel
Definition: worker.c:226
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalRepApplyLoop().

◆ apply_handle_begin()

static void apply_handle_begin ( StringInfo  s)
static

Definition at line 810 of file worker.c.

811 {
812  LogicalRepBeginData begin_data;
813 
814  logicalrep_read_begin(s, &begin_data);
815  set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
816 
817  remote_final_lsn = begin_data.final_lsn;
818 
820 
821  in_remote_transaction = true;
822 
824 }
bool in_remote_transaction
Definition: worker.c:256
static void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:4038
static XLogRecPtr remote_final_lsn
Definition: worker.c:257
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:3860
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
Definition: proto.c:74
XLogRecPtr final_lsn
Definition: logicalproto.h:124
TransactionId xid
Definition: logicalproto.h:126

References LogicalRepBeginData::final_lsn, in_remote_transaction, logicalrep_read_begin(), maybe_start_skipping_changes(), pgstat_report_activity(), remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, and LogicalRepBeginData::xid.

Referenced by apply_dispatch().

◆ apply_handle_begin_prepare()

static void apply_handle_begin_prepare ( StringInfo  s)
static

Definition at line 858 of file worker.c.

859 {
860  LogicalRepPreparedTxnData begin_data;
861 
862  /* Tablesync should never receive prepare. */
863  if (am_tablesync_worker())
864  ereport(ERROR,
865  (errcode(ERRCODE_PROTOCOL_VIOLATION),
866  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
867 
868  logicalrep_read_begin_prepare(s, &begin_data);
869  set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
870 
871  remote_final_lsn = begin_data.prepare_lsn;
872 
874 
875  in_remote_transaction = true;
876 
878 }
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
Definition: proto.c:145
static bool am_tablesync_worker(void)

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_begin_prepare(), maybe_start_skipping_changes(), pgstat_report_activity(), LogicalRepPreparedTxnData::prepare_lsn, remote_final_lsn, set_apply_error_context_xact(), STATE_RUNNING, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_commit()

static void apply_handle_commit ( StringInfo  s)
static

Definition at line 832 of file worker.c.

833 {
834  LogicalRepCommitData commit_data;
835 
836  logicalrep_read_commit(s, &commit_data);
837 
838  if (commit_data.commit_lsn != remote_final_lsn)
839  ereport(ERROR,
840  (errcode(ERRCODE_PROTOCOL_VIOLATION),
841  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
842  LSN_FORMAT_ARGS(commit_data.commit_lsn),
844 
845  apply_handle_commit_internal(&commit_data);
846 
847  /* Process any tables that are being synchronized in parallel. */
848  process_syncing_tables(commit_data.end_lsn);
849 
852 }
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data)
Definition: worker.c:1503
static void reset_apply_error_context_info(void)
Definition: worker.c:4046
@ STATE_IDLE
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:109
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:590

References apply_handle_commit_internal(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, logicalrep_read_commit(), LSN_FORMAT_ARGS, pgstat_report_activity(), process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), and STATE_IDLE.

Referenced by apply_dispatch().

◆ apply_handle_commit_internal()

static void apply_handle_commit_internal ( LogicalRepCommitData commit_data)
static

Definition at line 1503 of file worker.c.

1504 {
1505  if (is_skipping_changes())
1506  {
1508 
1509  /*
1510  * Start a new transaction to clear the subskiplsn, if not started
1511  * yet.
1512  */
1513  if (!IsTransactionState())
1515  }
1516 
1517  if (IsTransactionState())
1518  {
1519  /*
1520  * The transaction is either non-empty or skipped, so we clear the
1521  * subskiplsn.
1522  */
1524 
1525  /*
1526  * Update origin state so we can restart streaming from correct
1527  * position in case of crash.
1528  */
1529  replorigin_session_origin_lsn = commit_data->end_lsn;
1531 
1533  pgstat_report_stat(false);
1534 
1535  store_flush_position(commit_data->end_lsn);
1536  }
1537  else
1538  {
1539  /* Process any invalidation messages that might have accumulated. */
1542  }
1543 
1544  in_remote_transaction = false;
1545 }
static void maybe_reread_subscription(void)
Definition: worker.c:3018
static void stop_skipping_changes(void)
Definition: worker.c:3887
static void store_flush_position(XLogRecPtr remote_lsn)
Definition: worker.c:2635
#define is_skipping_changes()
Definition: worker.c:277
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
Definition: worker.c:3909
void AcceptInvalidationMessages(void)
Definition: inval.c:746
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
long pgstat_report_stat(bool force)
Definition: pgstat.c:565
TimestampTz committime
Definition: logicalproto.h:133
bool IsTransactionState(void)
Definition: xact.c:374
void StartTransactionCommand(void)
Definition: xact.c:2925
void CommitTransactionCommand(void)
Definition: xact.c:3022

References AcceptInvalidationMessages(), clear_subscription_skip_lsn(), LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, CommitTransactionCommand(), LogicalRepCommitData::end_lsn, in_remote_transaction, is_skipping_changes, IsTransactionState(), maybe_reread_subscription(), pgstat_report_stat(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, StartTransactionCommand(), stop_skipping_changes(), and store_flush_position().

Referenced by apply_handle_commit(), and apply_handle_stream_commit().

◆ apply_handle_commit_prepared()

static void apply_handle_commit_prepared ( StringInfo  s)
static

Definition at line 973 of file worker.c.

974 {
975  LogicalRepCommitPreparedTxnData prepare_data;
976  char gid[GIDSIZE];
977 
978  logicalrep_read_commit_prepared(s, &prepare_data);
979  set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
980 
981  /* Compute GID for two_phase transactions. */
983  gid, sizeof(gid));
984 
985  /* There is no transaction when COMMIT PREPARED is called */
987 
988  /*
989  * Update origin state so we can restart streaming from correct position
990  * in case of crash.
991  */
992  replorigin_session_origin_lsn = prepare_data.end_lsn;
994 
995  FinishPreparedTransaction(gid, true);
998  pgstat_report_stat(false);
999 
1000  store_flush_position(prepare_data.end_lsn);
1001  in_remote_transaction = false;
1002 
1003  /* Process any tables that are being synchronized in parallel. */
1004  process_syncing_tables(prepare_data.end_lsn);
1005 
1006  clear_subscription_skip_lsn(prepare_data.end_lsn);
1007 
1010 }
static void begin_replication_step(void)
Definition: worker.c:398
static void end_replication_step(void)
Definition: worker.c:421
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
Definition: worker.c:3481
Subscription * MySubscription
Definition: worker.c:253
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
Definition: proto.c:278
void FinishPreparedTransaction(const char *gid, bool isCommit)
Definition: twophase.c:1482
#define GIDSIZE
Definition: xact.h:31

References begin_replication_step(), clear_subscription_skip_lsn(), LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, CommitTransactionCommand(), LogicalRepCommitPreparedTxnData::end_lsn, end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_commit_prepared(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_delete()

static void apply_handle_delete ( StringInfo  s)
static

Definition at line 1952 of file worker.c.

1953 {
1954  LogicalRepRelMapEntry *rel;
1955  LogicalRepTupleData oldtup;
1956  LogicalRepRelId relid;
1957  ApplyExecutionData *edata;
1958  EState *estate;
1959  TupleTableSlot *remoteslot;
1960  MemoryContext oldctx;
1961 
1962  /*
1963  * Quick return if we are skipping data modification changes or handling
1964  * streamed transactions.
1965  */
1966  if (is_skipping_changes() ||
1968  return;
1969 
1971 
1972  relid = logicalrep_read_delete(s, &oldtup);
1973  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1974  if (!should_apply_changes_for_rel(rel))
1975  {
1976  /*
1977  * The relation can't become interesting in the middle of the
1978  * transaction so it's safe to unlock it.
1979  */
1982  return;
1983  }
1984 
1985  /* Set relation for error callback */
1987 
1988  /* Check if we can do the delete. */
1990 
1991  /* Initialize the executor state. */
1992  edata = create_edata_for_relation(rel);
1993  estate = edata->estate;
1994  remoteslot = ExecInitExtraTupleSlot(estate,
1995  RelationGetDescr(rel->localrel),
1996  &TTSOpsVirtual);
1997 
1998  /* Build the search tuple. */
2000  slot_store_data(remoteslot, rel, &oldtup);
2001  MemoryContextSwitchTo(oldctx);
2002 
2003  /* For a partitioned table, apply delete to correct partition. */
2004  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2006  remoteslot, NULL, CMD_DELETE);
2007  else
2009  remoteslot);
2010 
2011  finish_edata(edata);
2012 
2013  /* Reset relation for error callback */
2015 
2017 
2019 }
static void check_relation_updatable(LogicalRepRelMapEntry *rel)
Definition: worker.c:1739
static ApplyExecutionData * create_edata_for_relation(LogicalRepRelMapEntry *rel)
Definition: worker.c:475
static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
Definition: worker.c:380
static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
Definition: worker.c:437
static void apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup, CmdType operation)
Definition: worker.c:2115
static void apply_handle_delete_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:2027
static void slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:616
static void finish_edata(ApplyExecutionData *edata)
Definition: worker.c:528
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1831
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:542
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
uint32 LogicalRepRelId
Definition: logicalproto.h:96
@ CMD_DELETE
Definition: nodes.h:724
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
Definition: proto.c:563
#define RelationGetDescr(relation)
Definition: rel.h:514
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:319
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:456
ResultRelInfo * targetRelInfo
Definition: worker.c:215
EState * estate
Definition: worker.c:212
Form_pg_class rd_rel
Definition: rel.h:109

References apply_error_callback_arg, apply_handle_delete_internal(), apply_handle_tuple_routing(), begin_replication_step(), check_relation_updatable(), CMD_DELETE, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_DELETE, logicalrep_read_delete(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_delete_internal()

static void apply_handle_delete_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 2027 of file worker.c.

2030 {
2031  EState *estate = edata->estate;
2032  Relation localrel = relinfo->ri_RelationDesc;
2033  LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2034  EPQState epqstate;
2035  TupleTableSlot *localslot;
2036  bool found;
2037 
2038  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2039  ExecOpenIndices(relinfo, false);
2040 
2041  found = FindReplTupleInLocalRel(estate, localrel, remoterel,
2042  remoteslot, &localslot);
2043 
2044  /* If found delete it. */
2045  if (found)
2046  {
2047  EvalPlanQualSetSlot(&epqstate, localslot);
2048 
2049  /* Do the actual delete. */
2051  ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2052  }
2053  else
2054  {
2055  /*
2056  * The tuple to be deleted could not be found. Do nothing except for
2057  * emitting a log message.
2058  *
2059  * XXX should this be promoted to ereport(LOG) perhaps?
2060  */
2061  elog(DEBUG1,
2062  "logical replication did not find row to be deleted "
2063  "in replication target relation \"%s\"",
2064  RelationGetRelationName(localrel));
2065  }
2066 
2067  /* Cleanup. */
2068  ExecCloseIndices(relinfo);
2069  EvalPlanQualEnd(&epqstate);
2070 }
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, LogicalRepRelation *remoterel, TupleTableSlot *remoteslot, TupleTableSlot **localslot)
Definition: worker.c:2080
static void TargetPrivilegesCheck(Relation rel, AclMode mode)
Definition: worker.c:1612
#define DEBUG1
Definition: elog.h:24
#define elog(elevel,...)
Definition: elog.h:218
void ExecCloseIndices(ResultRelInfo *resultRelInfo)
Definition: execIndexing.c:231
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
void EvalPlanQualEnd(EPQState *epqstate)
Definition: execMain.c:2932
void EvalPlanQualInit(EPQState *epqstate, EState *parentestate, Plan *subplan, List *auxrowmarks, int epqParam)
Definition: execMain.c:2511
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
#define EvalPlanQualSetSlot(epqstate, slot)
Definition: executor.h:229
#define ACL_DELETE
Definition: parsenodes.h:85
#define NIL
Definition: pg_list.h:65
#define RelationGetRelationName(relation)
Definition: rel.h:522
LogicalRepRelMapEntry * targetRel
Definition: worker.c:214
Relation ri_RelationDesc
Definition: execnodes.h:433

References ACL_DELETE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationDelete(), FindReplTupleInLocalRel(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_delete(), and apply_handle_tuple_routing().

◆ apply_handle_insert()

static void apply_handle_insert ( StringInfo  s)
static

Definition at line 1644 of file worker.c.

1645 {
1646  LogicalRepRelMapEntry *rel;
1647  LogicalRepTupleData newtup;
1648  LogicalRepRelId relid;
1649  ApplyExecutionData *edata;
1650  EState *estate;
1651  TupleTableSlot *remoteslot;
1652  MemoryContext oldctx;
1653 
1654  /*
1655  * Quick return if we are skipping data modification changes or handling
1656  * streamed transactions.
1657  */
1658  if (is_skipping_changes() ||
1660  return;
1661 
1663 
1664  relid = logicalrep_read_insert(s, &newtup);
1665  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1666  if (!should_apply_changes_for_rel(rel))
1667  {
1668  /*
1669  * The relation can't become interesting in the middle of the
1670  * transaction so it's safe to unlock it.
1671  */
1674  return;
1675  }
1676 
1677  /* Set relation for error callback */
1679 
1680  /* Initialize the executor state. */
1681  edata = create_edata_for_relation(rel);
1682  estate = edata->estate;
1683  remoteslot = ExecInitExtraTupleSlot(estate,
1684  RelationGetDescr(rel->localrel),
1685  &TTSOpsVirtual);
1686 
1687  /* Process and store remote tuple in the slot */
1689  slot_store_data(remoteslot, rel, &newtup);
1690  slot_fill_defaults(rel, estate, remoteslot);
1691  MemoryContextSwitchTo(oldctx);
1692 
1693  /* For a partitioned table, insert the tuple into a partition. */
1694  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1696  remoteslot, NULL, CMD_INSERT);
1697  else
1699  remoteslot);
1700 
1701  finish_edata(edata);
1702 
1703  /* Reset relation for error callback */
1705 
1707 
1709 }
static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot)
Definition: worker.c:1717
static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, TupleTableSlot *slot)
Definition: worker.c:559
@ CMD_INSERT
Definition: nodes.h:723
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
Definition: proto.c:436

References apply_error_callback_arg, apply_handle_insert_internal(), apply_handle_tuple_routing(), begin_replication_step(), CMD_INSERT, create_edata_for_relation(), end_replication_step(), ApplyExecutionData::estate, ExecInitExtraTupleSlot(), finish_edata(), GetPerTupleMemoryContext, handle_streamed_transaction(), is_skipping_changes, LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_INSERT, logicalrep_read_insert(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_fill_defaults(), slot_store_data(), ApplyExecutionData::targetRelInfo, and TTSOpsVirtual.

Referenced by apply_dispatch().

◆ apply_handle_insert_internal()

static void apply_handle_insert_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot 
)
static

Definition at line 1717 of file worker.c.

1720 {
1721  EState *estate = edata->estate;
1722 
1723  /* We must open indexes here. */
1724  ExecOpenIndices(relinfo, false);
1725 
1726  /* Do the insert. */
1728  ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1729 
1730  /* Cleanup. */
1731  ExecCloseIndices(relinfo);
1732 }
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
#define ACL_INSERT
Definition: parsenodes.h:82

References ACL_INSERT, ApplyExecutionData::estate, ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationInsert(), ResultRelInfo::ri_RelationDesc, and TargetPrivilegesCheck().

Referenced by apply_handle_insert(), and apply_handle_tuple_routing().

◆ apply_handle_origin()

static void apply_handle_origin ( StringInfo  s)
static

Definition at line 1131 of file worker.c.

1132 {
1133  /*
1134  * ORIGIN message can only come inside streaming transaction or inside
1135  * remote transaction and before any actual writes.
1136  */
1137  if (!in_streamed_transaction &&
1140  ereport(ERROR,
1141  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1142  errmsg_internal("ORIGIN message sent out of order")));
1143 }
static bool in_streamed_transaction
Definition: worker.c:260

References am_tablesync_worker(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, and IsTransactionState().

Referenced by apply_dispatch().

◆ apply_handle_prepare()

static void apply_handle_prepare ( StringInfo  s)
static

Definition at line 918 of file worker.c.

919 {
920  LogicalRepPreparedTxnData prepare_data;
921 
922  logicalrep_read_prepare(s, &prepare_data);
923 
924  if (prepare_data.prepare_lsn != remote_final_lsn)
925  ereport(ERROR,
926  (errcode(ERRCODE_PROTOCOL_VIOLATION),
927  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
928  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
930 
931  /*
932  * Unlike commit, here, we always prepare the transaction even though no
933  * change has happened in this transaction or all changes are skipped. It
934  * is done this way because at commit prepared time, we won't know whether
935  * we have skipped preparing a transaction because of those reasons.
936  *
937  * XXX, We can optimize such that at commit prepared time, we first check
938  * whether we have prepared the transaction or not but that doesn't seem
939  * worthwhile because such cases shouldn't be common.
940  */
942 
943  apply_handle_prepare_internal(&prepare_data);
944 
947  pgstat_report_stat(false);
948 
949  store_flush_position(prepare_data.end_lsn);
950 
951  in_remote_transaction = false;
952 
953  /* Process any tables that are being synchronized in parallel. */
954  process_syncing_tables(prepare_data.end_lsn);
955 
956  /*
957  * Since we have already prepared the transaction, in a case where the
958  * server crashes before clearing the subskiplsn, it will be left but the
959  * transaction won't be resent. But that's okay because it's a rare case
960  * and the subskiplsn will be cleared when finishing the next transaction.
961  */
964 
967 }
static void apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
Definition: worker.c:884
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:239

References apply_handle_prepare_internal(), begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, logicalrep_read_prepare(), LSN_FORMAT_ARGS, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), remote_final_lsn, reset_apply_error_context_info(), STATE_IDLE, stop_skipping_changes(), and store_flush_position().

Referenced by apply_dispatch().

◆ apply_handle_prepare_internal()

static void apply_handle_prepare_internal ( LogicalRepPreparedTxnData prepare_data)
static

Definition at line 884 of file worker.c.

885 {
886  char gid[GIDSIZE];
887 
888  /*
889  * Compute unique GID for two_phase transactions. We don't use GID of
890  * prepared transaction sent by server as that can lead to deadlock when
891  * we have multiple subscriptions from same node point to publications on
892  * the same node. See comments atop worker.c
893  */
895  gid, sizeof(gid));
896 
897  /*
898  * BeginTransactionBlock is necessary to balance the EndTransactionBlock
899  * called within the PrepareTransactionBlock below.
900  */
902  CommitTransactionCommand(); /* Completes the preceding Begin command. */
903 
904  /*
905  * Update origin state so we can restart streaming from correct position
906  * in case of crash.
907  */
908  replorigin_session_origin_lsn = prepare_data->end_lsn;
910 
912 }
bool PrepareTransactionBlock(const char *gid)
Definition: xact.c:3788
void BeginTransactionBlock(void)
Definition: xact.c:3720

References BeginTransactionBlock(), CommitTransactionCommand(), LogicalRepPreparedTxnData::end_lsn, GIDSIZE, MySubscription, Subscription::oid, LogicalRepPreparedTxnData::prepare_time, PrepareTransactionBlock(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, TwoPhaseTransactionGid(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_prepare(), and apply_handle_stream_prepare().

◆ apply_handle_relation()

static void apply_handle_relation ( StringInfo  s)
static

Definition at line 1556 of file worker.c.

1557 {
1558  LogicalRepRelation *rel;
1559 
1561  return;
1562 
1563  rel = logicalrep_read_rel(s);
1565 
1566  /* Also reset all entries in the partition map that refer to remoterel. */
1568 }
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
Definition: proto.c:699
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
Definition: relation.c:523
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157

References handle_streamed_transaction(), LOGICAL_REP_MSG_RELATION, logicalrep_partmap_reset_relmap(), logicalrep_read_rel(), and logicalrep_relmap_update().

Referenced by apply_dispatch().

◆ apply_handle_rollback_prepared()

static void apply_handle_rollback_prepared ( StringInfo  s)
static

Definition at line 1016 of file worker.c.

1017 {
1018  LogicalRepRollbackPreparedTxnData rollback_data;
1019  char gid[GIDSIZE];
1020 
1021  logicalrep_read_rollback_prepared(s, &rollback_data);
1022  set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1023 
1024  /* Compute GID for two_phase transactions. */
1025  TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1026  gid, sizeof(gid));
1027 
1028  /*
1029  * It is possible that we haven't received prepare because it occurred
1030  * before walsender reached a consistent point or the two_phase was still
1031  * not enabled by that time, so in such cases, we need to skip rollback
1032  * prepared.
1033  */
1034  if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1035  rollback_data.prepare_time))
1036  {
1037  /*
1038  * Update origin state so we can restart streaming from correct
1039  * position in case of crash.
1040  */
1043 
1044  /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1046  FinishPreparedTransaction(gid, false);
1049 
1051  }
1052 
1053  pgstat_report_stat(false);
1054 
1055  store_flush_position(rollback_data.rollback_end_lsn);
1056  in_remote_transaction = false;
1057 
1058  /* Process any tables that are being synchronized in parallel. */
1060 
1063 }
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
Definition: proto.c:336
bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp)
Definition: twophase.c:2579

References begin_replication_step(), clear_subscription_skip_lsn(), CommitTransactionCommand(), end_replication_step(), FinishPreparedTransaction(), GIDSIZE, in_remote_transaction, logicalrep_read_rollback_prepared(), LookupGXact(), MySubscription, Subscription::oid, pgstat_report_activity(), pgstat_report_stat(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, process_syncing_tables(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, reset_apply_error_context_info(), LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, set_apply_error_context_xact(), STATE_IDLE, store_flush_position(), TwoPhaseTransactionGid(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_abort()

static void apply_handle_stream_abort ( StringInfo  s)
static

Definition at line 1249 of file worker.c.

1250 {
1251  TransactionId xid;
1252  TransactionId subxid;
1253 
1255  ereport(ERROR,
1256  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1257  errmsg_internal("STREAM ABORT message without STREAM STOP")));
1258 
1259  logicalrep_read_stream_abort(s, &xid, &subxid);
1260 
1261  /*
1262  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1263  * just delete the files with serialized info.
1264  */
1265  if (xid == subxid)
1266  {
1269  }
1270  else
1271  {
1272  /*
1273  * OK, so it's a subxact. We need to read the subxact file for the
1274  * toplevel transaction, determine the offset tracked for the subxact,
1275  * and truncate the file with changes. We also remove the subxacts
1276  * with higher offsets (or rather higher XIDs).
1277  *
1278  * We intentionally scan the array from the tail, because we're likely
1279  * aborting a change for the most recent subtransactions.
1280  *
1281  * We can't use the binary search here as subxact XIDs won't
1282  * necessarily arrive in sorted order, consider the case where we have
1283  * released the savepoint for multiple subtransactions and then
1284  * performed rollback to savepoint for one of the earlier
1285  * sub-transaction.
1286  */
1287  int64 i;
1288  int64 subidx;
1289  BufFile *fd;
1290  bool found = false;
1291  char path[MAXPGPATH];
1292 
1294 
1295  subidx = -1;
1298 
1299  for (i = subxact_data.nsubxacts; i > 0; i--)
1300  {
1301  if (subxact_data.subxacts[i - 1].xid == subxid)
1302  {
1303  subidx = (i - 1);
1304  found = true;
1305  break;
1306  }
1307  }
1308 
1309  /*
1310  * If it's an empty sub-transaction then we will not find the subxid
1311  * here so just cleanup the subxact info and return.
1312  */
1313  if (!found)
1314  {
1315  /* Cleanup the subxact info */
1320  return;
1321  }
1322 
1323  /* open the changes file */
1326  O_RDWR, false);
1327 
1328  /* OK, truncate the file at the right offset */
1330  subxact_data.subxacts[subidx].offset);
1331  BufFileClose(fd);
1332 
1333  /* discard the subxacts added later */
1334  subxact_data.nsubxacts = subidx;
1335 
1336  /* write the updated subxact list */
1338 
1341  }
1342 
1344 }
static void cleanup_subxact_info(void)
Definition: worker.c:3464
static void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:3340
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3326
static ApplySubXactData subxact_data
Definition: worker.c:298
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:3132
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:3181
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:286
void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
Definition: buffile.c:900
void BufFileClose(BufFile *file)
Definition: buffile.c:407
uint32 TransactionId
Definition: c.h:598
int i
Definition: isn.c:73
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:58
#define MAXPGPATH
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid)
Definition: proto.c:1184
uint32 nsubxacts
Definition: worker.c:292
SubXactInfo * subxacts
Definition: worker.c:295
FileSet * stream_fileset
off_t offset
Definition: worker.c:286
TransactionId xid
Definition: worker.c:284
int fileno
Definition: worker.c:285
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileTruncateFileSet(), changes_filename(), cleanup_subxact_info(), CommitTransactionCommand(), end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, fd(), SubXactInfo::fileno, i, in_streamed_transaction, InvalidXLogRecPtr, logicalrep_read_stream_abort(), MAXPGPATH, MyLogicalRepWorker, ApplySubXactData::nsubxacts, SubXactInfo::offset, reset_apply_error_context_info(), set_apply_error_context_xact(), stream_cleanup_files(), LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subxact_data, subxact_info_read(), subxact_info_write(), ApplySubXactData::subxacts, and SubXactInfo::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_commit()

static void apply_handle_stream_commit ( StringInfo  s)
static

Definition at line 1469 of file worker.c.

1470 {
1471  TransactionId xid;
1472  LogicalRepCommitData commit_data;
1473 
1475  ereport(ERROR,
1476  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1477  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1478 
1479  xid = logicalrep_read_stream_commit(s, &commit_data);
1480  set_apply_error_context_xact(xid, commit_data.commit_lsn);
1481 
1482  elog(DEBUG1, "received commit for streamed transaction %u", xid);
1483 
1484  apply_spooled_messages(xid, commit_data.commit_lsn);
1485 
1486  apply_handle_commit_internal(&commit_data);
1487 
1488  /* unlink the files with serialized changes and subxact info */
1490 
1491  /* Process any tables that are being synchronized in parallel. */
1492  process_syncing_tables(commit_data.end_lsn);
1493 
1495 
1497 }
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:1350
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
Definition: proto.c:1142

References apply_handle_commit_internal(), apply_spooled_messages(), LogicalRepCommitData::commit_lsn, DEBUG1, elog, LogicalRepCommitData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, logicalrep_read_stream_commit(), MyLogicalRepWorker, pgstat_report_activity(), process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, stream_cleanup_files(), and LogicalRepWorker::subid.

Referenced by apply_dispatch().

◆ apply_handle_stream_prepare()

static void apply_handle_stream_prepare ( StringInfo  s)
static

Definition at line 1073 of file worker.c.

1074 {
1075  LogicalRepPreparedTxnData prepare_data;
1076 
1078  ereport(ERROR,
1079  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1080  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1081 
1082  /* Tablesync should never receive prepare. */
1083  if (am_tablesync_worker())
1084  ereport(ERROR,
1085  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1086  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1087 
1088  logicalrep_read_stream_prepare(s, &prepare_data);
1089  set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1090 
1091  elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1092 
1093  /* Replay all the spooled operations. */
1094  apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1095 
1096  /* Mark the transaction as prepared. */
1097  apply_handle_prepare_internal(&prepare_data);
1098 
1100 
1101  pgstat_report_stat(false);
1102 
1103  store_flush_position(prepare_data.end_lsn);
1104 
1105  in_remote_transaction = false;
1106 
1107  /* unlink the files with serialized changes and subxact info. */
1109 
1110  /* Process any tables that are being synchronized in parallel. */
1111  process_syncing_tables(prepare_data.end_lsn);
1112 
1113  /*
1114  * Similar to prepare case, the subskiplsn could be left in a case of
1115  * server crash but it's okay. See the comments in apply_handle_prepare().
1116  */
1119 
1121 
1123 }
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:376

References am_tablesync_worker(), apply_handle_prepare_internal(), apply_spooled_messages(), clear_subscription_skip_lsn(), CommitTransactionCommand(), DEBUG1, elog, LogicalRepPreparedTxnData::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, in_remote_transaction, in_streamed_transaction, logicalrep_read_stream_prepare(), MyLogicalRepWorker, pgstat_report_activity(), pgstat_report_stat(), LogicalRepPreparedTxnData::prepare_lsn, process_syncing_tables(), reset_apply_error_context_info(), set_apply_error_context_xact(), STATE_IDLE, stop_skipping_changes(), store_flush_position(), stream_cleanup_files(), LogicalRepWorker::subid, and LogicalRepPreparedTxnData::xid.

Referenced by apply_dispatch().

◆ apply_handle_stream_start()

static void apply_handle_stream_start ( StringInfo  s)
static

Definition at line 1149 of file worker.c.

1150 {
1151  bool first_segment;
1152 
1154  ereport(ERROR,
1155  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1156  errmsg_internal("duplicate STREAM START message")));
1157 
1158  /*
1159  * Start a transaction on stream start, this transaction will be committed
1160  * on the stream stop unless it is a tablesync worker in which case it
1161  * will be committed after processing all the messages. We need the
1162  * transaction for handling the buffile, used for serializing the
1163  * streaming data and subxact info.
1164  */
1166 
1167  /* notify handle methods we're processing a remote transaction */
1168  in_streamed_transaction = true;
1169 
1170  /* extract XID of the top-level transaction */
1171  stream_xid = logicalrep_read_stream_start(s, &first_segment);
1172 
1174  ereport(ERROR,
1175  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1176  errmsg_internal("invalid transaction ID in streamed replication transaction")));
1177 
1179 
1180  /*
1181  * Initialize the worker's stream_fileset if we haven't yet. This will be
1182  * used for the entire duration of the worker so create it in a permanent
1183  * context. We create this on the very first streaming message from any
1184  * transaction and then use it for this and other streaming transactions.
1185  * Now, we could create a fileset at the start of the worker as well but
1186  * then we won't be sure that it will ever be used.
1187  */
1188  if (MyLogicalRepWorker->stream_fileset == NULL)
1189  {
1190  MemoryContext oldctx;
1191 
1193 
1196 
1197  MemoryContextSwitchTo(oldctx);
1198  }
1199 
1200  /* open the spool file for this transaction */
1202 
1203  /* if this is not the first segment, open existing subxact file */
1204  if (!first_segment)
1206 
1208 
1210 }
MemoryContext ApplyContext
Definition: worker.c:246
static void stream_open_file(Oid subid, TransactionId xid, bool first)
Definition: worker.c:3367
static TransactionId stream_xid
Definition: worker.c:262
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54
void * palloc(Size size)
Definition: mcxt.c:1068
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
Definition: proto.c:1092

References ApplyContext, begin_replication_step(), end_replication_step(), ereport, errcode(), errmsg_internal(), ERROR, FileSetInit(), in_streamed_transaction, InvalidXLogRecPtr, logicalrep_read_stream_start(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), set_apply_error_context_xact(), STATE_RUNNING, LogicalRepWorker::stream_fileset, stream_open_file(), stream_xid, LogicalRepWorker::subid, subxact_info_read(), and TransactionIdIsValid.

Referenced by apply_dispatch().

◆ apply_handle_stream_stop()

static void apply_handle_stream_stop ( StringInfo  s)
static

Definition at line 1216 of file worker.c.

1217 {
1219  ereport(ERROR,
1220  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1221  errmsg_internal("STREAM STOP message without STREAM START")));
1222 
1223  /*
1224  * Close the file with serialized changes, and serialize information about
1225  * subxacts for the toplevel transaction.
1226  */
1229 
1230  /* We must be in a valid transaction state */
1232 
1233  /* Commit the per-stream transaction */
1235 
1236  in_streamed_transaction = false;
1237 
1238  /* Reset per-stream context */
1240 
1243 }
static MemoryContext LogicalStreamingContext
Definition: worker.c:249
static void stream_close_file(void)
Definition: worker.c:3416
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143

References Assert(), CommitTransactionCommand(), ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, pgstat_report_activity(), reset_apply_error_context_info(), STATE_IDLE, stream_close_file(), stream_xid, LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_dispatch().

◆ apply_handle_truncate()

static void apply_handle_truncate ( StringInfo  s)
static

Definition at line 2349 of file worker.c.

2350 {
2351  bool cascade = false;
2352  bool restart_seqs = false;
2353  List *remote_relids = NIL;
2354  List *remote_rels = NIL;
2355  List *rels = NIL;
2356  List *part_rels = NIL;
2357  List *relids = NIL;
2358  List *relids_logged = NIL;
2359  ListCell *lc;
2360  LOCKMODE lockmode = AccessExclusiveLock;
2361 
2362  /*
2363  * Quick return if we are skipping data modification changes or handling
2364  * streamed transactions.
2365  */
2366  if (is_skipping_changes() ||
2368  return;
2369 
2371 
2372  remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
2373 
2374  foreach(lc, remote_relids)
2375  {
2376  LogicalRepRelId relid = lfirst_oid(lc);
2377  LogicalRepRelMapEntry *rel;
2378 
2379  rel = logicalrep_rel_open(relid, lockmode);
2380  if (!should_apply_changes_for_rel(rel))
2381  {
2382  /*
2383  * The relation can't become interesting in the middle of the
2384  * transaction so it's safe to unlock it.
2385  */
2386  logicalrep_rel_close(rel, lockmode);
2387  continue;
2388  }
2389 
2390  remote_rels = lappend(remote_rels, rel);
2392  rels = lappend(rels, rel->localrel);
2393  relids = lappend_oid(relids, rel->localreloid);
2395  relids_logged = lappend_oid(relids_logged, rel->localreloid);
2396 
2397  /*
2398  * Truncate partitions if we got a message to truncate a partitioned
2399  * table.
2400  */
2401  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2402  {
2403  ListCell *child;
2404  List *children = find_all_inheritors(rel->localreloid,
2405  lockmode,
2406  NULL);
2407 
2408  foreach(child, children)
2409  {
2410  Oid childrelid = lfirst_oid(child);
2411  Relation childrel;
2412 
2413  if (list_member_oid(relids, childrelid))
2414  continue;
2415 
2416  /* find_all_inheritors already got lock */
2417  childrel = table_open(childrelid, NoLock);
2418 
2419  /*
2420  * Ignore temp tables of other backends. See similar code in
2421  * ExecuteTruncate().
2422  */
2423  if (RELATION_IS_OTHER_TEMP(childrel))
2424  {
2425  table_close(childrel, lockmode);
2426  continue;
2427  }
2428 
2430  rels = lappend(rels, childrel);
2431  part_rels = lappend(part_rels, childrel);
2432  relids = lappend_oid(relids, childrelid);
2433  /* Log this relation only if needed for logical decoding */
2434  if (RelationIsLogicallyLogged(childrel))
2435  relids_logged = lappend_oid(relids_logged, childrelid);
2436  }
2437  }
2438  }
2439 
2440  /*
2441  * Even if we used CASCADE on the upstream primary we explicitly default
2442  * to replaying changes without further cascading. This might be later
2443  * changeable with a user specified option.
2444  */
2445  ExecuteTruncateGuts(rels,
2446  relids,
2447  relids_logged,
2448  DROP_RESTRICT,
2449  restart_seqs);
2450  foreach(lc, remote_rels)
2451  {
2452  LogicalRepRelMapEntry *rel = lfirst(lc);
2453 
2455  }
2456  foreach(lc, part_rels)
2457  {
2458  Relation rel = lfirst(lc);
2459 
2460  table_close(rel, NoLock);
2461  }
2462 
2464 }
List * lappend(List *list, void *datum)
Definition: list.c:336
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:701
int LOCKMODE
Definition: lockdefs.h:26
#define AccessExclusiveLock
Definition: lockdefs.h:43
@ DROP_RESTRICT
Definition: parsenodes.h:2207
#define ACL_TRUNCATE
Definition: parsenodes.h:86
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:169
#define lfirst_oid(lc)
Definition: pg_list.h:171
unsigned int Oid
Definition: postgres_ext.h:31
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
Definition: proto.c:617
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:685
#define RELATION_IS_OTHER_TEMP(relation)
Definition: rel.h:642
Definition: pg_list.h:51
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs)
Definition: tablecmds.c:1782

References AccessExclusiveLock, ACL_TRUNCATE, begin_replication_step(), DROP_RESTRICT, end_replication_step(), ExecuteTruncateGuts(), find_all_inheritors(), handle_streamed_transaction(), is_skipping_changes, lappend(), lappend_oid(), lfirst, lfirst_oid, list_member_oid(), LogicalRepRelMapEntry::localrel, LogicalRepRelMapEntry::localreloid, LOGICAL_REP_MSG_TRUNCATE, logicalrep_read_truncate(), logicalrep_rel_close(), logicalrep_rel_open(), NIL, NoLock, RelationData::rd_rel, RELATION_IS_OTHER_TEMP, RelationIsLogicallyLogged, should_apply_changes_for_rel(), table_close(), table_open(), and TargetPrivilegesCheck().

Referenced by apply_dispatch().

◆ apply_handle_tuple_routing()

static void apply_handle_tuple_routing ( ApplyExecutionData edata,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup,
CmdType  operation 
)
static

Definition at line 2115 of file worker.c.

2119 {
2120  EState *estate = edata->estate;
2121  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2122  ResultRelInfo *relinfo = edata->targetRelInfo;
2123  Relation parentrel = relinfo->ri_RelationDesc;
2124  ModifyTableState *mtstate;
2125  PartitionTupleRouting *proute;
2126  ResultRelInfo *partrelinfo;
2127  Relation partrel;
2128  TupleTableSlot *remoteslot_part;
2129  TupleConversionMap *map;
2130  MemoryContext oldctx;
2131  LogicalRepRelMapEntry *part_entry = NULL;
2132  AttrMap *attrmap = NULL;
2133 
2134  /* ModifyTableState is needed for ExecFindPartition(). */
2135  edata->mtstate = mtstate = makeNode(ModifyTableState);
2136  mtstate->ps.plan = NULL;
2137  mtstate->ps.state = estate;
2138  mtstate->operation = operation;
2139  mtstate->resultRelInfo = relinfo;
2140 
2141  /* ... as is PartitionTupleRouting. */
2142  edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2143 
2144  /*
2145  * Find the partition to which the "search tuple" belongs.
2146  */
2147  Assert(remoteslot != NULL);
2149  partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2150  remoteslot, estate);
2151  Assert(partrelinfo != NULL);
2152  partrel = partrelinfo->ri_RelationDesc;
2153 
2154  /*
2155  * To perform any of the operations below, the tuple must match the
2156  * partition's rowtype. Convert if needed or just copy, using a dedicated
2157  * slot to store the tuple in any case.
2158  */
2159  remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
2160  if (remoteslot_part == NULL)
2161  remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
2162  map = partrelinfo->ri_RootToPartitionMap;
2163  if (map != NULL)
2164  {
2165  attrmap = map->attrMap;
2166  remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2167  remoteslot_part);
2168  }
2169  else
2170  {
2171  remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2172  slot_getallattrs(remoteslot_part);
2173  }
2174  MemoryContextSwitchTo(oldctx);
2175 
2176  /* Check if we can do the update or delete on the leaf partition. */
2177  if (operation == CMD_UPDATE || operation == CMD_DELETE)
2178  {
2179  part_entry = logicalrep_partition_open(relmapentry, partrel,
2180  attrmap);
2181  check_relation_updatable(part_entry);
2182  }
2183 
2184  switch (operation)
2185  {
2186  case CMD_INSERT:
2187  apply_handle_insert_internal(edata, partrelinfo,
2188  remoteslot_part);
2189  break;
2190 
2191  case CMD_DELETE:
2192  apply_handle_delete_internal(edata, partrelinfo,
2193  remoteslot_part);
2194  break;
2195 
2196  case CMD_UPDATE:
2197 
2198  /*
2199  * For UPDATE, depending on whether or not the updated tuple
2200  * satisfies the partition's constraint, perform a simple UPDATE
2201  * of the partition or move the updated tuple into a different
2202  * suitable partition.
2203  */
2204  {
2205  TupleTableSlot *localslot;
2206  ResultRelInfo *partrelinfo_new;
2207  bool found;
2208 
2209  /* Get the matching local tuple from the partition. */
2210  found = FindReplTupleInLocalRel(estate, partrel,
2211  &part_entry->remoterel,
2212  remoteslot_part, &localslot);
2213  if (!found)
2214  {
2215  /*
2216  * The tuple to be updated could not be found. Do nothing
2217  * except for emitting a log message.
2218  *
2219  * XXX should this be promoted to ereport(LOG) perhaps?
2220  */
2221  elog(DEBUG1,
2222  "logical replication did not find row to be updated "
2223  "in replication target relation's partition \"%s\"",
2224  RelationGetRelationName(partrel));
2225  return;
2226  }
2227 
2228  /*
2229  * Apply the update to the local tuple, putting the result in
2230  * remoteslot_part.
2231  */
2233  slot_modify_data(remoteslot_part, localslot, part_entry,
2234  newtup);
2235  MemoryContextSwitchTo(oldctx);
2236 
2237  /*
2238  * Does the updated tuple still satisfy the current
2239  * partition's constraint?
2240  */
2241  if (!partrel->rd_rel->relispartition ||
2242  ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
2243  false))
2244  {
2245  /*
2246  * Yes, so simply UPDATE the partition. We don't call
2247  * apply_handle_update_internal() here, which would
2248  * normally do the following work, to avoid repeating some
2249  * work already done above to find the local tuple in the
2250  * partition.
2251  */
2252  EPQState epqstate;
2253 
2254  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2255  ExecOpenIndices(partrelinfo, false);
2256 
2257  EvalPlanQualSetSlot(&epqstate, remoteslot_part);
2259  ACL_UPDATE);
2260  ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
2261  localslot, remoteslot_part);
2262  ExecCloseIndices(partrelinfo);
2263  EvalPlanQualEnd(&epqstate);
2264  }
2265  else
2266  {
2267  /* Move the tuple into the new partition. */
2268 
2269  /*
2270  * New partition will be found using tuple routing, which
2271  * can only occur via the parent table. We might need to
2272  * convert the tuple to the parent's rowtype. Note that
2273  * this is the tuple found in the partition, not the
2274  * original search tuple received by this function.
2275  */
2276  if (map)
2277  {
2278  TupleConversionMap *PartitionToRootMap =
2280  RelationGetDescr(parentrel));
2281 
2282  remoteslot =
2283  execute_attr_map_slot(PartitionToRootMap->attrMap,
2284  remoteslot_part, remoteslot);
2285  }
2286  else
2287  {
2288  remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
2289  slot_getallattrs(remoteslot);
2290  }
2291 
2292 
2293  /* Find the new partition. */
2295  partrelinfo_new = ExecFindPartition(mtstate, relinfo,
2296  proute, remoteslot,
2297  estate);
2298  MemoryContextSwitchTo(oldctx);
2299  Assert(partrelinfo_new != partrelinfo);
2300 
2301  /* DELETE old tuple found in the old partition. */
2302  apply_handle_delete_internal(edata, partrelinfo,
2303  localslot);
2304 
2305  /* INSERT new tuple into the new partition. */
2306 
2307  /*
2308  * Convert the replacement tuple to match the destination
2309  * partition rowtype.
2310  */
2312  partrel = partrelinfo_new->ri_RelationDesc;
2313  remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
2314  if (remoteslot_part == NULL)
2315  remoteslot_part = table_slot_create(partrel,
2316  &estate->es_tupleTable);
2317  map = partrelinfo_new->ri_RootToPartitionMap;
2318  if (map != NULL)
2319  {
2320  remoteslot_part = execute_attr_map_slot(map->attrMap,
2321  remoteslot,
2322  remoteslot_part);
2323  }
2324  else
2325  {
2326  remoteslot_part = ExecCopySlot(remoteslot_part,
2327  remoteslot);
2328  slot_getallattrs(remoteslot);
2329  }
2330  MemoryContextSwitchTo(oldctx);
2331  apply_handle_insert_internal(edata, partrelinfo_new,
2332  remoteslot_part);
2333  }
2334  }
2335  break;
2336 
2337  default:
2338  elog(ERROR, "unrecognized CmdType: %d", (int) operation);
2339  break;
2340  }
2341 }
static void slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, LogicalRepRelMapEntry *rel, LogicalRepTupleData *tupleData)
Definition: worker.c:717
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1782
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
@ CMD_UPDATE
Definition: nodes.h:722
#define makeNode(_type_)
Definition: nodes.h:621
#define ACL_UPDATE
Definition: parsenodes.h:84
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
Definition: relation.c:585
PartitionTupleRouting * proute
Definition: worker.c:219
ModifyTableState * mtstate
Definition: worker.c:218
Definition: attmap.h:35
List * es_tupleTable
Definition: execnodes.h:634
CmdType operation
Definition: execnodes.h:1226
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1230
PlanState ps
Definition: execnodes.h:1225
Plan * plan
Definition: execnodes.h:998
EState * state
Definition: execnodes.h:1000
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:540
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:539
AttrMap * attrMap
Definition: tupconvert.h:28
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354

References ACL_UPDATE, apply_handle_delete_internal(), apply_handle_insert_internal(), Assert(), TupleConversionMap::attrMap, check_relation_updatable(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, convert_tuples_by_name(), DEBUG1, elog, ERROR, EState::es_tupleTable, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecCloseIndices(), ExecCopySlot(), ExecFindPartition(), ExecOpenIndices(), ExecPartitionCheck(), ExecSetupPartitionTupleRouting(), ExecSimpleRelationUpdate(), execute_attr_map_slot(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, logicalrep_partition_open(), makeNode, MemoryContextSwitchTo(), ApplyExecutionData::mtstate, NIL, ModifyTableState::operation, PlanState::plan, ApplyExecutionData::proute, ModifyTableState::ps, RelationData::rd_rel, RelationGetDescr, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ModifyTableState::resultRelInfo, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_RootToPartitionMap, slot_getallattrs(), slot_modify_data(), PlanState::state, table_slot_create(), TargetPrivilegesCheck(), ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ apply_handle_type()

static void apply_handle_type ( StringInfo  s)
static

Definition at line 1579 of file worker.c.

1580 {
1581  LogicalRepTyp typ;
1582 
1584  return;
1585 
1586  logicalrep_read_typ(s, &typ);
1587 }
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
Definition: proto.c:755

References handle_streamed_transaction(), LOGICAL_REP_MSG_TYPE, and logicalrep_read_typ().

Referenced by apply_dispatch().

◆ apply_handle_update()

static void apply_handle_update ( StringInfo  s)
static

Definition at line 1780 of file worker.c.

1781 {
1782  LogicalRepRelMapEntry *rel;
1783  LogicalRepRelId relid;
1784  ApplyExecutionData *edata;
1785  EState *estate;
1786  LogicalRepTupleData oldtup;
1787  LogicalRepTupleData newtup;
1788  bool has_oldtup;
1789  TupleTableSlot *remoteslot;
1790  RangeTblEntry *target_rte;
1791  MemoryContext oldctx;
1792 
1793  /*
1794  * Quick return if we are skipping data modification changes or handling
1795  * streamed transactions.
1796  */
1797  if (is_skipping_changes() ||
1799  return;
1800 
1802 
1803  relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1804  &newtup);
1805  rel = logicalrep_rel_open(relid, RowExclusiveLock);
1806  if (!should_apply_changes_for_rel(rel))
1807  {
1808  /*
1809  * The relation can't become interesting in the middle of the
1810  * transaction so it's safe to unlock it.
1811  */
1814  return;
1815  }
1816 
1817  /* Set relation for error callback */
1819 
1820  /* Check if we can do the update. */
1822 
1823  /* Initialize the executor state. */
1824  edata = create_edata_for_relation(rel);
1825  estate = edata->estate;
1826  remoteslot = ExecInitExtraTupleSlot(estate,
1827  RelationGetDescr(rel->localrel),
1828  &TTSOpsVirtual);
1829 
1830  /*
1831  * Populate updatedCols so that per-column triggers can fire, and so
1832  * executor can correctly pass down indexUnchanged hint. This could
1833  * include more columns than were actually changed on the publisher
1834  * because the logical replication protocol doesn't contain that
1835  * information. But it would for example exclude columns that only exist
1836  * on the subscriber, since we are not touching those.
1837  */
1838  target_rte = list_nth(estate->es_range_table, 0);
1839  for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1840  {
1842  int remoteattnum = rel->attrmap->attnums[i];
1843 
1844  if (!att->attisdropped && remoteattnum >= 0)
1845  {
1846  Assert(remoteattnum < newtup.ncols);
1847  if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1848  target_rte->updatedCols =
1849  bms_add_member(target_rte->updatedCols,
1851  }
1852  }
1853 
1854  /* Also populate extraUpdatedCols, in case we have generated columns */
1855  fill_extraUpdatedCols(target_rte, rel->localrel);
1856 
1857  /* Build the search tuple. */
1859  slot_store_data(remoteslot, rel,
1860  has_oldtup ? &oldtup : &newtup);
1861  MemoryContextSwitchTo(oldctx);
1862 
1863  /* For a partitioned table, apply update to correct partition. */
1864  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1866  remoteslot, &newtup, CMD_UPDATE);
1867  else
1869  remoteslot, &newtup);
1870 
1871  finish_edata(edata);
1872 
1873  /* Reset relation for error callback */
1875 
1877 
1879 }
static void apply_handle_update_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, LogicalRepTupleData *newtup)
Definition: worker.c:1887
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
#define LOGICALREP_COLUMN_UNCHANGED
Definition: logicalproto.h:92
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
Definition: proto.c:492
void fill_extraUpdatedCols(RangeTblEntry *target_rte, Relation target_relation)
AttrNumber * attnums
Definition: attmap.h:36
List * es_range_table
Definition: execnodes.h:592
Bitmapset * updatedCols
Definition: parsenodes.h:1169
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References apply_error_callback_arg, apply_handle_tuple_routing(), apply_handle_update_internal(), Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, begin_replication_step(), bms_add_member(), check_relation_updatable(), CMD_UPDATE, LogicalRepTupleData::colstatus, create_edata_for_relation(), end_replication_step(), EState::es_range_table, ApplyExecutionData::estate, ExecInitExtraTupleSlot(), fill_extraUpdatedCols(), finish_edata(), FirstLowInvalidHeapAttributeNumber, GetPerTupleMemoryContext, handle_streamed_transaction(), i, is_skipping_changes, list_nth(), LogicalRepRelMapEntry::localrel, LOGICAL_REP_MSG_UPDATE, LOGICALREP_COLUMN_UNCHANGED, logicalrep_read_update(), logicalrep_rel_close(), logicalrep_rel_open(), MemoryContextSwitchTo(), TupleDescData::natts, LogicalRepTupleData::ncols, NoLock, RelationData::rd_rel, ApplyErrorCallbackArg::rel, RelationGetDescr, RowExclusiveLock, should_apply_changes_for_rel(), slot_store_data(), ApplyExecutionData::targetRelInfo, TupleTableSlot::tts_tupleDescriptor, TTSOpsVirtual, TupleDescAttr, and RangeTblEntry::updatedCols.

Referenced by apply_dispatch().

◆ apply_handle_update_internal()

static void apply_handle_update_internal ( ApplyExecutionData edata,
ResultRelInfo relinfo,
TupleTableSlot remoteslot,
LogicalRepTupleData newtup 
)
static

Definition at line 1887 of file worker.c.

1891 {
1892  EState *estate = edata->estate;
1893  LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1894  Relation localrel = relinfo->ri_RelationDesc;
1895  EPQState epqstate;
1896  TupleTableSlot *localslot;
1897  bool found;
1898  MemoryContext oldctx;
1899 
1900  EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1901  ExecOpenIndices(relinfo, false);
1902 
1903  found = FindReplTupleInLocalRel(estate, localrel,
1904  &relmapentry->remoterel,
1905  remoteslot, &localslot);
1906  ExecClearTuple(remoteslot);
1907 
1908  /*
1909  * Tuple found.
1910  *
1911  * Note this will fail if there are other conflicting unique indexes.
1912  */
1913  if (found)
1914  {
1915  /* Process and store remote tuple in the slot */
1917  slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1918  MemoryContextSwitchTo(oldctx);
1919 
1920  EvalPlanQualSetSlot(&epqstate, remoteslot);
1921 
1922  /* Do the actual update. */
1924  ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1925  remoteslot);
1926  }
1927  else
1928  {
1929  /*
1930  * The tuple to be updated could not be found. Do nothing except for
1931  * emitting a log message.
1932  *
1933  * XXX should this be promoted to ereport(LOG) perhaps?
1934  */
1935  elog(DEBUG1,
1936  "logical replication did not find row to be updated "
1937  "in replication target relation \"%s\"",
1938  RelationGetRelationName(localrel));
1939  }
1940 
1941  /* Cleanup. */
1942  ExecCloseIndices(relinfo);
1943  EvalPlanQualEnd(&epqstate);
1944 }
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425

References ACL_UPDATE, DEBUG1, elog, ApplyExecutionData::estate, EvalPlanQualEnd(), EvalPlanQualInit(), EvalPlanQualSetSlot, ExecClearTuple(), ExecCloseIndices(), ExecOpenIndices(), ExecSimpleRelationUpdate(), FindReplTupleInLocalRel(), GetPerTupleMemoryContext, MemoryContextSwitchTo(), NIL, RelationGetRelationName, LogicalRepRelMapEntry::remoterel, ResultRelInfo::ri_RelationDesc, slot_modify_data(), TargetPrivilegesCheck(), and ApplyExecutionData::targetRel.

Referenced by apply_handle_update().

◆ apply_spooled_messages()

static void apply_spooled_messages ( TransactionId  xid,
XLogRecPtr  lsn 
)
static

Definition at line 1350 of file worker.c.

1351 {
1353  int nchanges;
1354  char path[MAXPGPATH];
1355  char *buffer = NULL;
1356  MemoryContext oldcxt;
1357  BufFile *fd;
1358 
1360 
1361  /* Make sure we have an open transaction */
1363 
1364  /*
1365  * Allocate file handle and memory required to process all the messages in
1366  * TopTransactionContext to avoid them getting reset after each message is
1367  * processed.
1368  */
1370 
1371  /* Open the spool file for the committed/prepared transaction */
1373  elog(DEBUG1, "replaying changes from file \"%s\"", path);
1374 
1376  false);
1377 
1378  buffer = palloc(BLCKSZ);
1379  initStringInfo(&s2);
1380 
1381  MemoryContextSwitchTo(oldcxt);
1382 
1383  remote_final_lsn = lsn;
1384 
1385  /*
1386  * Make sure the handle apply_dispatch methods are aware we're in a remote
1387  * transaction.
1388  */
1389  in_remote_transaction = true;
1391 
1393 
1394  /*
1395  * Read the entries one by one and pass them through the same logic as in
1396  * apply_dispatch.
1397  */
1398  nchanges = 0;
1399  while (true)
1400  {
1401  int nbytes;
1402  int len;
1403 
1405 
1406  /* read length of the on-disk record */
1407  nbytes = BufFileRead(fd, &len, sizeof(len));
1408 
1409  /* have we reached end of the file? */
1410  if (nbytes == 0)
1411  break;
1412 
1413  /* do we have a correct length? */
1414  if (nbytes != sizeof(len))
1415  ereport(ERROR,
1417  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1418  path)));
1419 
1420  if (len <= 0)
1421  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1422  len, path);
1423 
1424  /* make sure we have sufficiently large buffer */
1425  buffer = repalloc(buffer, len);
1426 
1427  /* and finally read the data into the buffer */
1428  if (BufFileRead(fd, buffer, len) != len)
1429  ereport(ERROR,
1431  errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1432  path)));
1433 
1434  /* copy the buffer to the stringinfo and call apply_dispatch */
1435  resetStringInfo(&s2);
1436  appendBinaryStringInfo(&s2, buffer, len);
1437 
1438  /* Ensure we are reading the data into our memory context. */
1440 
1441  apply_dispatch(&s2);
1442 
1444 
1445  MemoryContextSwitchTo(oldcxt);
1446 
1447  nchanges++;
1448 
1449  if (nchanges % 1000 == 0)
1450  elog(DEBUG1, "replayed %d changes from file \"%s\"",
1451  nchanges, path);
1452  }
1453 
1454  BufFileClose(fd);
1455 
1456  pfree(buffer);
1457  pfree(s2.data);
1458 
1459  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1460  nchanges, path);
1461 
1462  return;
1463 }
static MemoryContext ApplyMessageContext
Definition: worker.c:245
static void apply_dispatch(StringInfo s)
Definition: worker.c:2471
size_t BufFileRead(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:582
int errcode_for_file_access(void)
Definition: elog.c:716
MemoryContext TopTransactionContext
Definition: mcxt.c:53
void pfree(void *pointer)
Definition: mcxt.c:1175
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1188
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
const void size_t len
char * s2
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59

References appendBinaryStringInfo(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileClose(), BufFileOpenFileSet(), BufFileRead(), changes_filename(), CHECK_FOR_INTERRUPTS, DEBUG1, elog, end_replication_step(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), in_remote_transaction, initStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pfree(), pgstat_report_activity(), remote_final_lsn, repalloc(), resetStringInfo(), s2, STATE_RUNNING, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, and TopTransactionContext.

Referenced by apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 3573 of file worker.c.

3574 {
3575  int worker_slot = DatumGetInt32(main_arg);
3576  MemoryContext oldctx;
3577  char originname[NAMEDATALEN];
3578  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
3579  char *myslotname = NULL;
3581  int server_version;
3582 
3583  /* Attach to slot */
3584  logicalrep_worker_attach(worker_slot);
3585 
3586  /* Setup signal handling */
3588  pqsignal(SIGTERM, die);
3590 
3591  /*
3592  * We don't currently need any ResourceOwner in a walreceiver process, but
3593  * if we did, we could call CreateAuxProcessResourceOwner here.
3594  */
3595 
3596  /* Initialise stats to a sanish value */
3599 
3600  /* Load the libpq-specific functions */
3601  load_file("libpqwalreceiver", false);
3602 
3603  /* Run as replica session replication role. */
3604  SetConfigOption("session_replication_role", "replica",
3606 
3607  /* Connect to our database. */
3610  0);
3611 
3612  /*
3613  * Set always-secure search path, so malicious users can't redirect user
3614  * code (e.g. pg_index.indexprs).
3615  */
3616  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3617 
3618  /* Load the subscription into persistent memory context. */
3620  "ApplyContext",
3624 
3626  if (!MySubscription)
3627  {
3628  ereport(LOG,
3629  (errmsg("logical replication apply worker for subscription %u will not "
3630  "start because the subscription was removed during startup",
3632  proc_exit(0);
3633  }
3634 
3635  MySubscriptionValid = true;
3636  MemoryContextSwitchTo(oldctx);
3637 
3638  if (!MySubscription->enabled)
3639  {
3640  ereport(LOG,
3641  (errmsg("logical replication apply worker for subscription \"%s\" will not "
3642  "start because the subscription was disabled during startup",
3643  MySubscription->name)));
3644 
3645  proc_exit(0);
3646  }
3647 
3648  /* Setup synchronous commit according to the user's wishes */
3649  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3651 
3652  /* Keep us informed about subscription changes. */
3655  (Datum) 0);
3656 
3657  if (am_tablesync_worker())
3658  ereport(LOG,
3659  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3661  else
3662  ereport(LOG,
3663  (errmsg("logical replication apply worker for subscription \"%s\" has started",
3664  MySubscription->name)));
3665 
3667 
3668  /* Connect to the origin and start the replication. */
3669  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3671 
3672  if (am_tablesync_worker())
3673  {
3674  start_table_sync(&origin_startpos, &myslotname);
3675 
3676  /*
3677  * Allocate the origin name in long-lived context for error context
3678  * message.
3679  */
3682  originname,
3683  sizeof(originname));
3685  originname);
3686  }
3687  else
3688  {
3689  /* This is main apply worker */
3690  RepOriginId originid;
3691  TimeLineID startpointTLI;
3692  char *err;
3693 
3694  myslotname = MySubscription->slotname;
3695 
3696  /*
3697  * This shouldn't happen if the subscription is enabled, but guard
3698  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3699  * crash if slot is NULL.)
3700  */
3701  if (!myslotname)
3702  ereport(ERROR,
3703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3704  errmsg("subscription has no replication slot set")));
3705 
3706  /* Setup replication origin tracking. */
3708  snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3709  originid = replorigin_by_name(originname, true);
3710  if (!OidIsValid(originid))
3711  originid = replorigin_create(originname);
3712  replorigin_session_setup(originid);
3713  replorigin_session_origin = originid;
3714  origin_startpos = replorigin_session_get_progress(false);
3716 
3718  MySubscription->name, &err);
3719  if (LogRepWorkerWalRcvConn == NULL)
3720  ereport(ERROR,
3721  (errcode(ERRCODE_CONNECTION_FAILURE),
3722  errmsg("could not connect to the publisher: %s", err)));
3723 
3724  /*
3725  * We don't really use the output identify_system for anything but it
3726  * does some initializations on the upstream so let's still call it.
3727  */
3728  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3729 
3730  /*
3731  * Allocate the origin name in long-lived context for error context
3732  * message.
3733  */
3735  originname);
3736  }
3737 
3738  /*
3739  * Setup callback for syscache so that we know when something changes in
3740  * the subscription relation state.
3741  */
3744  (Datum) 0);
3745 
3746  /* Build logical replication streaming options. */
3747  options.logical = true;
3748  options.startpoint = origin_startpos;
3749  options.slotname = myslotname;
3750 
3752  options.proto.logical.proto_version =
3756 
3757  options.proto.logical.publication_names = MySubscription->publications;
3758  options.proto.logical.binary = MySubscription->binary;
3759  options.proto.logical.streaming = MySubscription->stream;
3760  options.proto.logical.twophase = false;
3761 
3762  if (!am_tablesync_worker())
3763  {
3764  /*
3765  * Even when the two_phase mode is requested by the user, it remains
3766  * as the tri-state PENDING until all tablesyncs have reached READY
3767  * state. Only then, can it become ENABLED.
3768  *
3769  * Note: If the subscription has no tables then leave the state as
3770  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
3771  * work.
3772  */
3775  {
3776  /* Start streaming with two_phase enabled */
3777  options.proto.logical.twophase = true;
3779 
3784  }
3785  else
3786  {
3788  }
3789 
3790  ereport(DEBUG1,
3791  (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
3796  "?")));
3797  }
3798  else
3799  {
3800  /* Start normal logical streaming replication. */
3802  }
3803 
3804  /* Run the main loop. */
3805  start_apply(origin_startpos);
3806 
3807  proc_exit(0);
3808 }
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:3502
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3117
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:3545
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
static bool MySubscriptionValid
Definition: worker.c:254
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
#define OidIsValid(objectId)
Definition: c.h:721
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
#define LOG
Definition: elog.h:25
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:8376
@ PGC_S_OVERRIDE
Definition: guc.h:120
@ PGC_SUSET
Definition: guc.h:75
@ PGC_BACKEND
Definition: guc.h:74
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_attach(int slot)
Definition: launcher.c:564
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:37
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
MemoryContext TopMemoryContext
Definition: mcxt.c:48
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1292
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
RepOriginId replorigin_session_origin
Definition: origin.c:155
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:85
static char ** options
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:95
#define snprintf
Definition: port.h:225
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetInt32(X)
Definition: postgres.h:516
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5713
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5684
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
@ SUBSCRIPTIONOID
Definition: syscache.h:99
bool AllTablesyncsReady(void)
Definition: tablesync.c:1519
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:1168
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1544
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:418
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:412
#define SIGHUP
Definition: win32_port.h:167
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AllTablesyncsReady(), am_tablesync_worker(), apply_error_callback_arg, ApplyContext, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32, LogicalRepWorker::dbid, DEBUG1, die, elog, Subscription::enabled, ereport, errcode(), errmsg(), ERROR, get_rel_name(), GetCurrentTimestamp(), GetSubscription(), invalidate_syncing_table_states(), InvalidXLogRecPtr, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOG, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogRepWorkerWalRcvConn, MemoryContextStrdup(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, ApplyErrorCallbackArg::origin_name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, pqsignal(), proc_exit(), Subscription::publications, LogicalRepWorker::relid, ReplicationOriginNameForTablesync(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, SetConfigOption(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, snprintf, start_apply(), start_table_sync(), StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, SUBSCRIPTIONRELMAP, Subscription::synccommit, TopMemoryContext, Subscription::twophasestate, UpdateTwoPhaseState(), LogicalRepWorker::userid, walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

◆ begin_replication_step()

◆ changes_filename()

static void changes_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 3326 of file worker.c.

3327 {
3328  snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
3329 }

References MAXPGPATH, and snprintf.

Referenced by apply_handle_stream_abort(), apply_spooled_messages(), stream_cleanup_files(), and stream_open_file().

◆ check_relation_updatable()

static void check_relation_updatable ( LogicalRepRelMapEntry rel)
static

Definition at line 1739 of file worker.c.

1740 {
1741  /*
1742  * For partitioned tables, we only need to care if the target partition is
1743  * updatable (aka has PK or RI defined for it).
1744  */
1745  if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1746  return;
1747 
1748  /* Updatable, no error. */
1749  if (rel->updatable)
1750  return;
1751 
1752  /*
1753  * We are in error mode so it's fine this is somewhat slow. It's better to
1754  * give user correct error.
1755  */
1757  {
1758  ereport(ERROR,
1759  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1760  errmsg("publisher did not send replica identity column "
1761  "expected by the logical replication target relation \"%s.%s\"",
1762  rel->remoterel.nspname, rel->remoterel.relname)));
1763  }
1764 
1765  ereport(ERROR,
1766  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1767  errmsg("logical replication target relation \"%s.%s\" has "
1768  "neither REPLICA IDENTITY index nor PRIMARY "
1769  "KEY and published relation does not have "
1770  "REPLICA IDENTITY FULL",
1771  rel->remoterel.nspname, rel->remoterel.relname)));
1772 }
static Oid GetRelationIdentityOrPK(Relation rel)
Definition: worker.c:1595

References ereport, errcode(), errmsg(), ERROR, GetRelationIdentityOrPK(), LogicalRepRelMapEntry::localrel, LogicalRepRelation::nspname, OidIsValid, RelationData::rd_rel, LogicalRepRelation::relname, LogicalRepRelMapEntry::remoterel, and LogicalRepRelMapEntry::updatable.

Referenced by apply_handle_delete(), apply_handle_tuple_routing(), and apply_handle_update().

◆ cleanup_subxact_info()

static void cleanup_subxact_info ( void  )
inlinestatic

◆ clear_subscription_skip_lsn()

static void clear_subscription_skip_lsn ( XLogRecPtr  finish_lsn)
static

Definition at line 3909 of file worker.c.

3910 {
3911  Relation rel;
3912  Form_pg_subscription subform;
3913  HeapTuple tup;
3914  XLogRecPtr myskiplsn = MySubscription->skiplsn;
3915  bool started_tx = false;
3916 
3917  if (likely(XLogRecPtrIsInvalid(myskiplsn)))
3918  return;
3919 
3920  if (!IsTransactionState())
3921  {
3923  started_tx = true;
3924  }
3925 
3926  /*
3927  * Protect subskiplsn of pg_subscription from being concurrently updated
3928  * while clearing it.
3929  */
3930  LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
3931  AccessShareLock);
3932 
3933  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
3934 
3935  /* Fetch the existing tuple. */
3938 
3939  if (!HeapTupleIsValid(tup))
3940  elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
3941 
3942  subform = (Form_pg_subscription) GETSTRUCT(tup);
3943 
3944  /*
3945  * Clear the subskiplsn. If the user has already changed subskiplsn before
3946  * clearing it we don't update the catalog and the replication origin
3947  * state won't get advanced. So in the worst case, if the server crashes
3948  * before sending an acknowledgment of the flush position the transaction
3949  * will be sent again and the user needs to set subskiplsn again. We can
3950  * reduce the possibility by logging a replication origin WAL record to
3951  * advance the origin LSN instead but there is no way to advance the
3952  * origin timestamp and it doesn't seem to be worth doing anything about
3953  * it since it's a very rare case.
3954  */
3955  if (subform->subskiplsn == myskiplsn)
3956  {
3957  bool nulls[Natts_pg_subscription];
3958  bool replaces[Natts_pg_subscription];
3959  Datum values[Natts_pg_subscription];
3960 
3961  memset(values, 0, sizeof(values));
3962  memset(nulls, false, sizeof(nulls));
3963  memset(replaces, false, sizeof(replaces));
3964 
3965  /* reset subskiplsn */
3966  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
3967  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
3968 
3969  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
3970  replaces);
3971  CatalogTupleUpdate(rel, &tup->t_self, tup);
3972 
3973  if (myskiplsn != finish_lsn)
3974  ereport(WARNING,
3975  errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
3976  errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
3977  LSN_FORMAT_ARGS(finish_lsn),
3978  LSN_FORMAT_ARGS(myskiplsn)));
3979  }
3980 
3981  heap_freetuple(tup);
3982  table_close(rel, NoLock);
3983 
3984  if (started_tx)
3986 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define likely(x)
Definition: c.h:283
int errdetail(const char *fmt,...)
Definition: elog.c:1037
#define WARNING
Definition: elog.h:30
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessShareLock
Definition: lockdefs.h:36
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
FormData_pg_subscription * Form_pg_subscription
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
ItemPointerData t_self
Definition: htup.h:65
XLogRecPtr skiplsn
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179

References AccessShareLock, CatalogTupleUpdate(), CommitTransactionCommand(), elog, ereport, errdetail(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, IsTransactionState(), likely, LockSharedObject(), LSN_FORMAT_ARGS, LSNGetDatum, MySubscription, Subscription::name, NoLock, ObjectIdGetDatum, Subscription::oid, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, Subscription::skiplsn, StartTransactionCommand(), SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), values, WARNING, and XLogRecPtrIsInvalid.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ create_edata_for_relation()

static ApplyExecutionData* create_edata_for_relation ( LogicalRepRelMapEntry rel)
static

Definition at line 475 of file worker.c.

476 {
477  ApplyExecutionData *edata;
478  EState *estate;
479  RangeTblEntry *rte;
480  ResultRelInfo *resultRelInfo;
481 
482  edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
483  edata->targetRel = rel;
484 
485  edata->estate = estate = CreateExecutorState();
486 
487  rte = makeNode(RangeTblEntry);
488  rte->rtekind = RTE_RELATION;
489  rte->relid = RelationGetRelid(rel->localrel);
490  rte->relkind = rel->localrel->rd_rel->relkind;
492  ExecInitRangeTable(estate, list_make1(rte));
493 
494  edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
495 
496  /*
497  * Use Relation opened by logicalrep_rel_open() instead of opening it
498  * again.
499  */
500  InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
501 
502  /*
503  * We put the ResultRelInfo in the es_opened_result_relations list, even
504  * though we don't populate the es_result_relations array. That's a bit
505  * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
506  *
507  * ExecOpenIndices() is not called here either, each execution path doing
508  * an apply operation being responsible for that.
509  */
511  lappend(estate->es_opened_result_relations, resultRelInfo);
512 
513  estate->es_output_cid = GetCurrentCommandId(true);
514 
515  /* Prepare to catch AFTER triggers. */
517 
518  /* other fields of edata remain NULL for now */
519 
520  return edata;
521 }
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options)
Definition: execMain.c:1195
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
EState * CreateExecutorState(void)
Definition: execUtils.c:90
void * palloc0(Size size)
Definition: mcxt.c:1099
@ RTE_RELATION
Definition: parsenodes.h:998
#define list_make1(x1)
Definition: pg_list.h:206
#define RelationGetRelid(relation)
Definition: rel.h:488
List * es_opened_result_relations
Definition: execnodes.h:610
CommandId es_output_cid
Definition: execnodes.h:604
RTEKind rtekind
Definition: parsenodes.h:1015
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4961
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:814

References AccessShareLock, AfterTriggerBeginQuery(), CreateExecutorState(), EState::es_opened_result_relations, EState::es_output_cid, ApplyExecutionData::estate, ExecInitRangeTable(), GetCurrentCommandId(), InitResultRelInfo(), lappend(), list_make1, LogicalRepRelMapEntry::localrel, makeNode, palloc0(), RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, RangeTblEntry::rtekind, ApplyExecutionData::targetRel, and ApplyExecutionData::targetRelInfo.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ DisableSubscriptionAndExit()

static void DisableSubscriptionAndExit ( void  )
static

Definition at line 3815 of file worker.c.

3816 {
3817  /*
3818  * Emit the error message, and recover from the error state to an idle
3819  * state
3820  */
3821  HOLD_INTERRUPTS();
3822 
3823  EmitErrorReport();
3825  FlushErrorState();
3826 
3828 
3829  /* Report the worker failed during either table synchronization or apply */
3831  !am_tablesync_worker());
3832 
3833  /* Disable the subscription */
3837 
3838  /* Notify the subscription has been disabled and exit */
3839  ereport(LOG,
3840  errmsg("logical replication subscription \"%s\" has been disabled due to an error",
3841  MySubscription->name));
3842 
3843  proc_exit(0);
3844 }
void EmitErrorReport(void)
Definition: elog.c:1504
void FlushErrorState(void)
Definition: elog.c:1649
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4662

References AbortOutOfAnyTransaction(), am_tablesync_worker(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ end_replication_step()

◆ FindReplTupleInLocalRel()

static bool FindReplTupleInLocalRel ( EState estate,
Relation  localrel,
LogicalRepRelation remoterel,
TupleTableSlot remoteslot,
TupleTableSlot **  localslot 
)
static

Definition at line 2080 of file worker.c.

2084 {
2085  Oid idxoid;
2086  bool found;
2087 
2088  /*
2089  * Regardless of the top-level operation, we're performing a read here, so
2090  * check for SELECT privileges.
2091  */
2092  TargetPrivilegesCheck(localrel, ACL_SELECT);
2093 
2094  *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2095 
2096  idxoid = GetRelationIdentityOrPK(localrel);
2097  Assert(OidIsValid(idxoid) ||
2098  (remoterel->replident == REPLICA_IDENTITY_FULL));
2099 
2100  if (OidIsValid(idxoid))
2101  found = RelationFindReplTupleByIndex(localrel, idxoid,
2103  remoteslot, *localslot);
2104  else
2105  found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2106  remoteslot, *localslot);
2107 
2108  return found;
2109 }
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
@ LockTupleExclusive
Definition: lockoptions.h:58
#define ACL_SELECT
Definition: parsenodes.h:83

References ACL_SELECT, Assert(), EState::es_tupleTable, GetRelationIdentityOrPK(), LockTupleExclusive, OidIsValid, RelationFindReplTupleByIndex(), RelationFindReplTupleSeq(), LogicalRepRelation::replident, table_slot_create(), and TargetPrivilegesCheck().

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ finish_edata()

static void finish_edata ( ApplyExecutionData edata)
static

Definition at line 528 of file worker.c.

529 {
530  EState *estate = edata->estate;
531 
532  /* Handle any queued AFTER triggers. */
533  AfterTriggerEndQuery(estate);
534 
535  /* Shut down tuple routing, if any was done. */
536  if (edata->proute)
537  ExecCleanupTupleRouting(edata->mtstate, edata->proute);
538 
539  /*
540  * Cleanup. It might seem that we should call ExecCloseResultRelations()
541  * here, but we intentionally don't. It would close the rel we added to
542  * es_opened_result_relations above, which is wrong because we took no
543  * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
544  * any other relations opened during execution.
545  */
546  ExecResetTupleTable(estate->es_tupleTable, false);
547  FreeExecutorState(estate);
548  pfree(edata);
549 }
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1191
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4981

References AfterTriggerEndQuery(), EState::es_tupleTable, ApplyExecutionData::estate, ExecCleanupTupleRouting(), ExecResetTupleTable(), FreeExecutorState(), ApplyExecutionData::mtstate, pfree(), and ApplyExecutionData::proute.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ get_flush_position()

static void get_flush_position ( XLogRecPtr write,
XLogRecPtr flush,
bool have_pending_txes 
)
static

Definition at line 2591 of file worker.c.

2593 {
2594  dlist_mutable_iter iter;
2595  XLogRecPtr local_flush = GetFlushRecPtr(NULL);
2596 
2598  *flush = InvalidXLogRecPtr;
2599 
2601  {
2602  FlushPosition *pos =
2603  dlist_container(FlushPosition, node, iter.cur);
2604 
2605  *write = pos->remote_end;
2606 
2607  if (pos->local_end <= local_flush)
2608  {
2609  *flush = pos->remote_end;
2610  dlist_delete(iter.cur);
2611  pfree(pos);
2612  }
2613  else
2614  {
2615  /*
2616  * Don't want to uselessly iterate over the rest of the list which
2617  * could potentially be long. Instead get the last element and
2618  * grab the write position from there.
2619  */
2620  pos = dlist_tail_element(FlushPosition, node,
2621  &lsn_mapping);
2622  *write = pos->remote_end;
2623  *have_pending_txes = true;
2624  return;
2625  }
2626  }
2627 
2628  *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2629 }
static dlist_head lsn_mapping
Definition: worker.c:208
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:515
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
#define write(a, b, c)
Definition: win32.h:14
XLogRecPtr remote_end
Definition: worker.c:205
XLogRecPtr local_end
Definition: worker.c:204
dlist_node * cur
Definition: ilist.h:180
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:5927

References dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), dlist_tail_element, GetFlushRecPtr(), InvalidXLogRecPtr, FlushPosition::local_end, lsn_mapping, pfree(), FlushPosition::remote_end, and write.

Referenced by send_feedback().

◆ GetRelationIdentityOrPK()

static Oid GetRelationIdentityOrPK ( Relation  rel)
static

Definition at line 1595 of file worker.c.

1596 {
1597  Oid idxoid;
1598 
1599  idxoid = RelationGetReplicaIndex(rel);
1600 
1601  if (!OidIsValid(idxoid))
1602  idxoid = RelationGetPrimaryKeyIndex(rel);
1603 
1604  return idxoid;
1605 }
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4863
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4884

References OidIsValid, RelationGetPrimaryKeyIndex(), and RelationGetReplicaIndex().

Referenced by check_relation_updatable(), and FindReplTupleInLocalRel().

◆ handle_streamed_transaction()

static bool handle_streamed_transaction ( LogicalRepMsgType  action,
StringInfo  s 
)
static

Definition at line 437 of file worker.c.

438 {
439  TransactionId xid;
440 
441  /* not in streaming mode */
443  return false;
444 
445  Assert(stream_fd != NULL);
447 
448  /*
449  * We should have received XID of the subxact as the first part of the
450  * message, so extract it.
451  */
452  xid = pq_getmsgint(s, 4);
453 
454  if (!TransactionIdIsValid(xid))
455  ereport(ERROR,
456  (errcode(ERRCODE_PROTOCOL_VIOLATION),
457  errmsg_internal("invalid transaction ID in streamed replication transaction")));
458 
459  /* Add the new subxact to the array (unless already there). */
460  subxact_info_add(xid);
461 
462  /* write the change to the current file */
464 
465  return true;
466 }
static void subxact_info_add(TransactionId xid)
Definition: worker.c:3241
static BufFile * stream_fd
Definition: worker.c:280
static void stream_write_change(char action, StringInfo s)
Definition: worker.c:3437
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417

References generate_unaccent_rules::action, Assert(), ereport, errcode(), errmsg_internal(), ERROR, in_streamed_transaction, pq_getmsgint(), stream_fd, stream_write_change(), stream_xid, subxact_info_add(), and TransactionIdIsValid.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_relation(), apply_handle_truncate(), apply_handle_type(), and apply_handle_update().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 3850 of file worker.c.

3851 {
3852  return MyLogicalRepWorker != NULL;
3853 }

References MyLogicalRepWorker.

Referenced by ProcessInterrupts().

◆ LogicalRepApplyLoop()

static void LogicalRepApplyLoop ( XLogRecPtr  last_received)
static

Definition at line 2670 of file worker.c.

2671 {
2672  TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2673  bool ping_sent = false;
2674  TimeLineID tli;
2675  ErrorContextCallback errcallback;
2676 
2677  /*
2678  * Init the ApplyMessageContext which we clean up after each replication
2679  * protocol message.
2680  */
2682  "ApplyMessageContext",
2684 
2685  /*
2686  * This memory context is used for per-stream data when the streaming mode
2687  * is enabled. This context is reset on each stream stop.
2688  */
2690  "LogicalStreamingContext",
2692 
2693  /* mark as idle, before starting to loop */
2695 
2696  /*
2697  * Push apply error context callback. Fields will be filled while applying
2698  * a change.
2699  */
2700  errcallback.callback = apply_error_callback;
2701  errcallback.previous = error_context_stack;
2702  error_context_stack = &errcallback;
2703 
2704  /* This outer loop iterates once per wait. */
2705  for (;;)
2706  {
2708  int rc;
2709  int len;
2710  char *buf = NULL;
2711  bool endofstream = false;
2712  long wait_time;
2713 
2715 
2717 
2719 
2720  if (len != 0)
2721  {
2722  /* Loop to process all available data (without blocking). */
2723  for (;;)
2724  {
2726 
2727  if (len == 0)
2728  {
2729  break;
2730  }
2731  else if (len < 0)
2732  {
2733  ereport(LOG,
2734  (errmsg("data stream from publisher has ended")));
2735  endofstream = true;
2736  break;
2737  }
2738  else
2739  {
2740  int c;
2741  StringInfoData s;
2742 
2743  /* Reset timeout. */
2744  last_recv_timestamp = GetCurrentTimestamp();
2745  ping_sent = false;
2746 
2747  /* Ensure we are reading the data into our memory context. */
2749 
2750  s.data = buf;
2751  s.len = len;
2752  s.cursor = 0;
2753  s.maxlen = -1;
2754 
2755  c = pq_getmsgbyte(&s);
2756 
2757  if (c == 'w')
2758  {
2759  XLogRecPtr start_lsn;
2760  XLogRecPtr end_lsn;
2761  TimestampTz send_time;
2762 
2763  start_lsn = pq_getmsgint64(&s);
2764  end_lsn = pq_getmsgint64(&s);
2765  send_time = pq_getmsgint64(&s);
2766 
2767  if (last_received < start_lsn)
2768  last_received = start_lsn;
2769 
2770  if (last_received < end_lsn)
2771  last_received = end_lsn;
2772 
2773  UpdateWorkerStats(last_received, send_time, false);
2774 
2775  apply_dispatch(&s);
2776  }
2777  else if (c == 'k')
2778  {
2779  XLogRecPtr end_lsn;
2781  bool reply_requested;
2782 
2783  end_lsn = pq_getmsgint64(&s);
2784  timestamp = pq_getmsgint64(&s);
2785  reply_requested = pq_getmsgbyte(&s);
2786 
2787  if (last_received < end_lsn)
2788  last_received = end_lsn;
2789 
2790  send_feedback(last_received, reply_requested, false);
2791  UpdateWorkerStats(last_received, timestamp, true);
2792  }
2793  /* other message types are purposefully ignored */
2794 
2796  }
2797 
2799  }
2800  }
2801 
2802  /* confirm all writes so far */
2803  send_feedback(last_received, false, false);
2804 
2806  {
2807  /*
2808  * If we didn't get any transactions for a while there might be
2809  * unconsumed invalidation messages in the queue, consume them
2810  * now.
2811  */
2814 
2815  /* Process any table synchronization changes. */
2816  process_syncing_tables(last_received);
2817  }
2818 
2819  /* Cleanup the memory. */
2822 
2823  /* Check if we need to exit the streaming loop. */
2824  if (endofstream)
2825  break;
2826 
2827  /*
2828  * Wait for more data or latch. If we have unflushed transactions,
2829  * wake up after WalWriterDelay to see if they've been flushed yet (in
2830  * which case we should send a feedback message). Otherwise, there's
2831  * no particular urgency about waking up unless we get data or a
2832  * signal.
2833  */
2834  if (!dlist_is_empty(&lsn_mapping))
2835  wait_time = WalWriterDelay;
2836  else
2837  wait_time = NAPTIME_PER_CYCLE;
2838 
2842  fd, wait_time,
2844 
2845  if (rc & WL_LATCH_SET)
2846  {
2849  }
2850 
2851  if (ConfigReloadPending)
2852  {
2853  ConfigReloadPending = false;
2855  }
2856 
2857  if (rc & WL_TIMEOUT)
2858  {
2859  /*
2860  * We didn't receive anything new. If we haven't heard anything
2861  * from the server for more than wal_receiver_timeout / 2, ping
2862  * the server. Also, if it's been longer than
2863  * wal_receiver_status_interval since the last update we sent,
2864  * send a status update to the primary anyway, to report any
2865  * progress in applying WAL.
2866  */
2867  bool requestReply = false;
2868 
2869  /*
2870  * Check if time since last receive from primary has reached the
2871  * configured limit.
2872  */
2873  if (wal_receiver_timeout > 0)
2874  {
2876  TimestampTz timeout;
2877 
2878  timeout =
2879  TimestampTzPlusMilliseconds(last_recv_timestamp,
2881 
2882  if (now >= timeout)
2883  ereport(ERROR,
2884  (errcode(ERRCODE_CONNECTION_FAILURE),
2885  errmsg("terminating logical replication worker due to timeout")));
2886 
2887  /* Check to see if it's time for a ping. */
2888  if (!ping_sent)
2889  {
2890  timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2891  (wal_receiver_timeout / 2));
2892  if (now >= timeout)
2893  {
2894  requestReply = true;
2895  ping_sent = true;
2896  }
2897  }
2898  }
2899 
2900  send_feedback(last_received, requestReply, requestReply);
2901 
2902  /*
2903  * Force reporting to ensure long idle periods don't lead to
2904  * arbitrarily delayed stats. Stats can only be reported outside
2905  * of (implicit or explicit) transactions. That shouldn't lead to
2906  * stats being delayed for long, because transactions are either
2907  * sent as a whole on commit or streamed. Streamed transactions
2908  * are spilled to disk and applied on commit.
2909  */
2910  if (!IsTransactionState())
2911  pgstat_report_stat(true);
2912  }
2913  }
2914 
2915  /* Pop the error context stack */
2916  error_context_stack = errcallback.previous;
2917 
2918  /* All done */
2920 }
static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
Definition: worker.c:2654
#define NAPTIME_PER_CYCLE
Definition: worker.c:199
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
Definition: worker.c:2929
static void apply_error_callback(void *arg)
Definition: worker.c:3990
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
int64 TimestampTz
Definition: timestamp.h:39
ErrorContextCallback * error_context_stack
Definition: elog.c:93
struct Latch * MyLatch
Definition: globals.c:58
@ PGC_SIGHUP
Definition: guc.h:72
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define MemoryContextResetAndDeleteChildren(ctx)
Definition: memutils.h:67
static char * buf
Definition: pg_test_fsync.c:67
int64 timestamp
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
char * c
struct ErrorContextCallback * previous
Definition: elog.h:232
void(* callback)(void *arg)
Definition: elog.h:233
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
@ WAIT_EVENT_LOGICAL_APPLY_MAIN
Definition: wait_event.h:43
int wal_receiver_timeout
Definition: walreceiver.c:91
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
int WalWriterDelay
Definition: walwriter.c:70

References AcceptInvalidationMessages(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, buf, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, StringInfoData::cursor, StringInfoData::data, dlist_is_empty(), ereport, errcode(), errmsg(), ERROR, error_context_stack, fd(), GetCurrentTimestamp(), in_remote_transaction, in_streamed_transaction, IsTransactionState(), StringInfoData::len, len, LOG, LogicalStreamingContext, LogRepWorkerWalRcvConn, lsn_mapping, StringInfoData::maxlen, maybe_reread_subscription(), MemoryContextReset(), MemoryContextResetAndDeleteChildren, MemoryContextSwitchTo(), MyLatch, NAPTIME_PER_CYCLE, now(), PGC_SIGHUP, PGINVALID_SOCKET, pgstat_report_activity(), pgstat_report_stat(), pq_getmsgbyte(), pq_getmsgint64(), ErrorContextCallback::previous, process_syncing_tables(), ProcessConfigFile(), ResetLatch(), send_feedback(), STATE_IDLE, TimestampTzPlusMilliseconds, TopMemoryContext, UpdateWorkerStats(), WAIT_EVENT_LOGICAL_APPLY_MAIN, WaitLatchOrSocket(), wal_receiver_timeout, walrcv_endstreaming, walrcv_receive, WalWriterDelay, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by start_apply().

◆ maybe_reread_subscription()

static void maybe_reread_subscription ( void  )
static

Definition at line 3018 of file worker.c.

3019 {
3020  MemoryContext oldctx;
3022  bool started_tx = false;
3023 
3024  /* When cache state is valid there is nothing to do here. */
3025  if (MySubscriptionValid)
3026  return;
3027 
3028  /* This function might be called inside or outside of transaction. */
3029  if (!IsTransactionState())
3030  {
3032  started_tx = true;
3033  }
3034 
3035  /* Ensure allocations in permanent context. */
3037 
3039 
3040  /*
3041  * Exit if the subscription was removed. This normally should not happen
3042  * as the worker gets killed during DROP SUBSCRIPTION.
3043  */
3044  if (!newsub)
3045  {
3046  ereport(LOG,
3047  (errmsg("logical replication apply worker for subscription \"%s\" will "
3048  "stop because the subscription was removed",
3049  MySubscription->name)));
3050 
3051  proc_exit(0);
3052  }
3053 
3054  /* Exit if the subscription was disabled. */
3055  if (!newsub->enabled)
3056  {
3057  ereport(LOG,
3058  (errmsg("logical replication apply worker for subscription \"%s\" will "
3059  "stop because the subscription was disabled",
3060  MySubscription->name)));
3061 
3062  proc_exit(0);
3063  }
3064 
3065  /* !slotname should never happen when enabled is true. */
3066  Assert(newsub->slotname);
3067 
3068  /* two-phase should not be altered */
3069  Assert(newsub->twophasestate == MySubscription->twophasestate);
3070 
3071  /*
3072  * Exit if any parameter that affects the remote connection was changed.
3073  * The launcher will start a new worker.
3074  */
3075  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3076  strcmp(newsub->name, MySubscription->name) != 0 ||
3077  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3078  newsub->binary != MySubscription->binary ||
3079  newsub->stream != MySubscription->stream ||
3080  newsub->owner != MySubscription->owner ||
3081  !equal(newsub->publications, MySubscription->publications))
3082  {
3083  ereport(LOG,
3084  (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
3085  MySubscription->name)));
3086 
3087  proc_exit(0);
3088  }
3089 
3090  /* Check for other changes that should never happen too. */
3091  if (newsub->dbid != MySubscription->dbid)
3092  {
3093  elog(ERROR, "subscription %u changed unexpectedly",
3095  }
3096 
3097  /* Clean old subscription info and switch to new one. */
3100 
3101  MemoryContextSwitchTo(oldctx);
3102 
3103  /* Change synchronous commit according to the user's wishes */
3104  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3106 
3107  if (started_tx)
3109 
3110  MySubscriptionValid = true;
3111 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3564
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389

References ApplyContext, Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog, equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::owner, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), and LogicalRepApplyLoop().

◆ maybe_start_skipping_changes()

static void maybe_start_skipping_changes ( XLogRecPtr  finish_lsn)
static

Definition at line 3860 of file worker.c.

3861 {
3865 
3866  /*
3867  * Quick return if it's not requested to skip this transaction. This
3868  * function is called for every remote transaction and we assume that
3869  * skipping the transaction is not used often.
3870  */
3872  MySubscription->skiplsn != finish_lsn))
3873  return;
3874 
3875  /* Start skipping all changes of this transaction */
3876  skip_xact_finish_lsn = finish_lsn;
3877 
3878  ereport(LOG,
3879  errmsg("start skipping logical replication transaction finished at %X/%X",
3881 }
static XLogRecPtr skip_xact_finish_lsn
Definition: worker.c:276

References Assert(), ereport, errmsg(), in_remote_transaction, in_streamed_transaction, is_skipping_changes, likely, LOG, LSN_FORMAT_ARGS, MySubscription, skip_xact_finish_lsn, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by apply_handle_begin(), apply_handle_begin_prepare(), and apply_spooled_messages().

◆ reset_apply_error_context_info()

◆ send_feedback()

static void send_feedback ( XLogRecPtr  recvpos,
bool  force,
bool  requestReply 
)
static

Definition at line 2929 of file worker.c.

2930 {
2931  static StringInfo reply_message = NULL;
2932  static TimestampTz send_time = 0;
2933 
2934  static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2935  static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2936  static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2937 
2938  XLogRecPtr writepos;
2939  XLogRecPtr flushpos;
2940  TimestampTz now;
2941  bool have_pending_txes;
2942 
2943  /*
2944  * If the user doesn't want status to be reported to the publisher, be
2945  * sure to exit before doing anything at all.
2946  */
2947  if (!force && wal_receiver_status_interval <= 0)
2948  return;
2949 
2950  /* It's legal to not pass a recvpos */
2951  if (recvpos < last_recvpos)
2952  recvpos = last_recvpos;
2953 
2954  get_flush_position(&writepos, &flushpos, &have_pending_txes);
2955 
2956  /*
2957  * No outstanding transactions to flush, we can report the latest received
2958  * position. This is important for synchronous replication.
2959  */
2960  if (!have_pending_txes)
2961  flushpos = writepos = recvpos;
2962 
2963  if (writepos < last_writepos)
2964  writepos = last_writepos;
2965 
2966  if (flushpos < last_flushpos)
2967  flushpos = last_flushpos;
2968 
2970 
2971  /* if we've already reported everything we're good */
2972  if (!force &&
2973  writepos == last_writepos &&
2974  flushpos == last_flushpos &&
2975  !TimestampDifferenceExceeds(send_time, now,
2977  return;
2978  send_time = now;
2979 
2980  if (!reply_message)
2981  {
2983 
2985  MemoryContextSwitchTo(oldctx);
2986  }
2987  else
2989 
2990  pq_sendbyte(reply_message, 'r');
2991  pq_sendint64(reply_message, recvpos); /* write */
2992  pq_sendint64(reply_message, flushpos); /* flush */
2993  pq_sendint64(reply_message, writepos); /* apply */
2994  pq_sendint64(reply_message, now); /* sendTime */
2995  pq_sendbyte(reply_message, requestReply); /* replyRequested */
2996 
2997  elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2998  force,
2999  LSN_FORMAT_ARGS(recvpos),
3000  LSN_FORMAT_ARGS(writepos),
3001  LSN_FORMAT_ARGS(flushpos));
3002 
3005 
3006  if (recvpos > last_recvpos)
3007  last_recvpos = recvpos;
3008  if (writepos > last_writepos)
3009  last_writepos = writepos;
3010  if (flushpos > last_flushpos)
3011  last_flushpos = flushpos;
3012 }
static void get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, bool *have_pending_txes)
Definition: worker.c:2591
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
#define DEBUG2
Definition: elog.h:23
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static StringInfoData reply_message
Definition: walreceiver.c:119
int wal_receiver_status_interval
Definition: walreceiver.c:90
#define walrcv_send(conn, buffer, nbytes)
Definition: walreceiver.h:424

References ApplyContext, StringInfoData::data, DEBUG2, elog, get_flush_position(), GetCurrentTimestamp(), InvalidXLogRecPtr, StringInfoData::len, LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, makeStringInfo(), MemoryContextSwitchTo(), now(), pq_sendbyte(), pq_sendint64(), reply_message, resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by LogicalRepApplyLoop().

◆ set_apply_error_context_xact()

◆ should_apply_changes_for_rel()

static bool should_apply_changes_for_rel ( LogicalRepRelMapEntry rel)
static

Definition at line 380 of file worker.c.

381 {
382  if (am_tablesync_worker())
383  return MyLogicalRepWorker->relid == rel->localreloid;
384  else
385  return (rel->state == SUBREL_STATE_READY ||
386  (rel->state == SUBREL_STATE_SYNCDONE &&
387  rel->statelsn <= remote_final_lsn));
388 }

References am_tablesync_worker(), LogicalRepRelMapEntry::localreloid, MyLogicalRepWorker, LogicalRepWorker::relid, remote_final_lsn, LogicalRepRelMapEntry::state, and LogicalRepRelMapEntry::statelsn.

Referenced by apply_handle_delete(), apply_handle_insert(), apply_handle_truncate(), and apply_handle_update().

◆ slot_fill_defaults()

static void slot_fill_defaults ( LogicalRepRelMapEntry rel,
EState estate,
TupleTableSlot slot 
)
static

Definition at line 559 of file worker.c.

561 {
562  TupleDesc desc = RelationGetDescr(rel->localrel);
563  int num_phys_attrs = desc->natts;
564  int i;
565  int attnum,
566  num_defaults = 0;
567  int *defmap;
568  ExprState **defexprs;
569  ExprContext *econtext;
570 
571  econtext = GetPerTupleExprContext(estate);
572 
573  /* We got all the data via replication, no need to evaluate anything. */
574  if (num_phys_attrs == rel->remoterel.natts)
575  return;
576 
577  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
578  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
579 
580  Assert(rel->attrmap->maplen == num_phys_attrs);
581  for (attnum = 0; attnum < num_phys_attrs; attnum++)
582  {
583  Expr *defexpr;
584 
585  if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
586  continue;
587 
588  if (rel->attrmap->attnums[attnum] >= 0)
589  continue;
590 
591  defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
592 
593  if (defexpr != NULL)
594  {
595  /* Run the expression through planner */
596  defexpr = expression_planner(defexpr);
597 
598  /* Initialize executable expression in copycontext */
599  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
600  defmap[num_defaults] = attnum;
601  num_defaults++;
602  }
603  }
604 
605  for (i = 0; i < num_defaults; i++)
606  slot->tts_values[defmap[i]] =
607  ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
608 }
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:160
#define GetPerTupleExprContext(estate)
Definition: executor.h:537
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:320
int16 attnum
Definition: pg_attribute.h:83
Expr * expression_planner(Expr *expr)
Definition: planner.c:5879
Node * build_column_default(Relation rel, int attrno)
int maplen
Definition: attmap.h:37
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126

References Assert(), attnum, AttrMap::attnums, LogicalRepRelMapEntry::attrmap, build_column_default(), ExecEvalExpr(), ExecInitExpr(), expression_planner(), GetPerTupleExprContext, i, LogicalRepRelMapEntry::localrel, AttrMap::maplen, TupleDescData::natts, LogicalRepRelation::natts, palloc(), RelationGetDescr, LogicalRepRelMapEntry::remoterel, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_insert().

◆ slot_modify_data()

static void slot_modify_data ( TupleTableSlot slot,
TupleTableSlot srcslot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 717 of file worker.c.

720 {
721  int natts = slot->tts_tupleDescriptor->natts;
722  int i;
723 
724  /* We'll fill "slot" with a virtual tuple, so we must start with ... */
725  ExecClearTuple(slot);
726 
727  /*
728  * Copy all the column data from srcslot, so that we'll have valid values
729  * for unreplaced columns.
730  */
731  Assert(natts == srcslot->tts_tupleDescriptor->natts);
732  slot_getallattrs(srcslot);
733  memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
734  memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
735 
736  /* Call the "in" function for each replaced attribute */
737  Assert(natts == rel->attrmap->maplen);
738  for (i = 0; i < natts; i++)
739  {
741  int remoteattnum = rel->attrmap->attnums[i];
742 
743  if (remoteattnum < 0)
744  continue;
745 
746  Assert(remoteattnum < tupleData->ncols);
747 
748  if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
749  {
750  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
751 
752  /* Set attnum for error callback */
754 
755  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
756  {
757  Oid typinput;
758  Oid typioparam;
759 
760  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
761  slot->tts_values[i] =
762  OidInputFunctionCall(typinput, colvalue->data,
763  typioparam, att->atttypmod);
764  slot->tts_isnull[i] = false;
765  }
766  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
767  {
768  Oid typreceive;
769  Oid typioparam;
770 
771  /*
772  * In some code paths we may be asked to re-parse the same
773  * tuple data. Reset the StringInfo's cursor so that works.
774  */
775  colvalue->cursor = 0;
776 
777  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
778  slot->tts_values[i] =
779  OidReceiveFunctionCall(typreceive, colvalue,
780  typioparam, att->atttypmod);
781 
782  /* Trouble if it didn't eat the whole buffer */
783  if (colvalue->cursor != colvalue->len)
784  ereport(ERROR,
785  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
786  errmsg("incorrect binary data format in logical replication column %d",
787  remoteattnum + 1)));
788  slot->tts_isnull[i] = false;
789  }
790  else
791  {
792  /* must be LOGICALREP_COLUMN_NULL */
793  slot->tts_values[i] = (Datum) 0;
794  slot->tts_isnull[i] = true;
795  }
796 
797  /* Reset attnum for error callback */
799  }
800  }
801 
802  /* And finally, declare that "slot" contains a valid virtual tuple */
803  ExecStoreVirtualTuple(slot);
804 }
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
Datum OidReceiveFunctionCall(Oid functionId, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1648
Datum OidInputFunctionCall(Oid functionId, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1630
#define LOGICALREP_COLUMN_BINARY
Definition: logicalproto.h:94
#define LOGICALREP_COLUMN_TEXT
Definition: logicalproto.h:93
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2831
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2897
StringInfoData * colvalues
Definition: logicalproto.h:82

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ slot_store_data()

static void slot_store_data ( TupleTableSlot slot,
LogicalRepRelMapEntry rel,
LogicalRepTupleData tupleData 
)
static

Definition at line 616 of file worker.c.

618 {
619  int natts = slot->tts_tupleDescriptor->natts;
620  int i;
621 
622  ExecClearTuple(slot);
623 
624  /* Call the "in" function for each non-dropped, non-null attribute */
625  Assert(natts == rel->attrmap->maplen);
626  for (i = 0; i < natts; i++)
627  {
629  int remoteattnum = rel->attrmap->attnums[i];
630 
631  if (!att->attisdropped && remoteattnum >= 0)
632  {
633  StringInfo colvalue = &tupleData->colvalues[remoteattnum];
634 
635  Assert(remoteattnum < tupleData->ncols);
636 
637  /* Set attnum for error callback */
639 
640  if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
641  {
642  Oid typinput;
643  Oid typioparam;
644 
645  getTypeInputInfo(att->atttypid, &typinput, &typioparam);
646  slot->tts_values[i] =
647  OidInputFunctionCall(typinput, colvalue->data,
648  typioparam, att->atttypmod);
649  slot->tts_isnull[i] = false;
650  }
651  else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
652  {
653  Oid typreceive;
654  Oid typioparam;
655 
656  /*
657  * In some code paths we may be asked to re-parse the same
658  * tuple data. Reset the StringInfo's cursor so that works.
659  */
660  colvalue->cursor = 0;
661 
662  getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
663  slot->tts_values[i] =
664  OidReceiveFunctionCall(typreceive, colvalue,
665  typioparam, att->atttypmod);
666 
667  /* Trouble if it didn't eat the whole buffer */
668  if (colvalue->cursor != colvalue->len)
669  ereport(ERROR,
670  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
671  errmsg("incorrect binary data format in logical replication column %d",
672  remoteattnum + 1)));
673  slot->tts_isnull[i] = false;
674  }
675  else
676  {
677  /*
678  * NULL value from remote. (We don't expect to see
679  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
680  * NULL.)
681  */
682  slot->tts_values[i] = (Datum) 0;
683  slot->tts_isnull[i] = true;
684  }
685 
686  /* Reset attnum for error callback */
688  }
689  else
690  {
691  /*
692  * We assign NULL to dropped attributes and missing values
693  * (missing values should be later filled using
694  * slot_fill_defaults).
695  */
696  slot->tts_values[i] = (Datum) 0;
697  slot->tts_isnull[i] = true;
698  }
699  }
700 
701  ExecStoreVirtualTuple(slot);
702 }

References apply_error_callback_arg, Assert(), AttrMap::attnums, LogicalRepRelMapEntry::attrmap, LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, StringInfoData::cursor, StringInfoData::data, ereport, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecStoreVirtualTuple(), getTypeBinaryInputInfo(), getTypeInputInfo(), i, StringInfoData::len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_TEXT, AttrMap::maplen, TupleDescData::natts, OidInputFunctionCall(), OidReceiveFunctionCall(), ApplyErrorCallbackArg::remote_attnum, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, and TupleDescAttr.

Referenced by apply_handle_delete(), apply_handle_insert(), and apply_handle_update().

◆ start_apply()

static void start_apply ( XLogRecPtr  origin_startpos)
static

Definition at line 3545 of file worker.c.

3546 {
3547  PG_TRY();
3548  {
3549  LogicalRepApplyLoop(origin_startpos);
3550  }
3551  PG_CATCH();
3552  {
3555  else
3556  {
3557  /*
3558  * Report the worker failed while applying changes. Abort the
3559  * current transaction so that the stats message is sent in an
3560  * idle state.
3561  */
3564 
3565  PG_RE_THROW();
3566  }
3567  }
3568  PG_END_TRY();
3569 }
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:2670
static void DisableSubscriptionAndExit(void)
Definition: worker.c:3815
#define PG_RE_THROW()
Definition: elog.h:340
#define PG_END_TRY()
Definition: elog.h:324
#define PG_TRY()
Definition: elog.h:299
#define PG_CATCH()
Definition: elog.h:309

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by ApplyWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  myslotname 
)
static

Definition at line 3502 of file worker.c.

3503 {
3504  char *syncslotname = NULL;
3505 
3507 
3508  PG_TRY();
3509  {
3510  /* Call initial sync. */
3511  syncslotname = LogicalRepSyncTableStart(origin_startpos);
3512  }
3513  PG_CATCH();
3514  {
3517  else
3518  {
3519  /*
3520  * Report the worker failed during table synchronization. Abort
3521  * the current transaction so that the stats message is sent in an
3522  * idle state.
3523  */
3526 
3527  PG_RE_THROW();
3528  }
3529  }
3530  PG_END_TRY();
3531 
3532  /* allocate slot name in long-lived context */
3533  *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3534  pfree(syncslotname);
3535 }
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1183

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by ApplyWorkerMain().

◆ stop_skipping_changes()

static void stop_skipping_changes ( void  )
static

Definition at line 3887 of file worker.c.

3888 {
3889  if (!is_skipping_changes())
3890  return;
3891 
3892  ereport(LOG,
3893  (errmsg("done skipping logical replication transaction finished at %X/%X",
3895 
3896  /* Stop skipping changes */
3898 }

References ereport, errmsg(), InvalidXLogRecPtr, is_skipping_changes, LOG, LSN_FORMAT_ARGS, and skip_xact_finish_lsn.

Referenced by apply_handle_commit_internal(), apply_handle_prepare(), and apply_handle_stream_prepare().

◆ store_flush_position()

static void store_flush_position ( XLogRecPtr  remote_lsn)
static

Definition at line 2635 of file worker.c.

2636 {
2637  FlushPosition *flushpos;
2638 
2639  /* Need to do this in permanent context */
2641 
2642  /* Track commit lsn */
2643  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2644  flushpos->local_end = XactLastCommitEnd;
2645  flushpos->remote_end = remote_lsn;
2646 
2647  dlist_push_tail(&lsn_mapping, &flushpos->node);
2649 }
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
dlist_node node
Definition: worker.c:203
XLogRecPtr XactLastCommitEnd
Definition: xlog.c:252

References ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), FlushPosition::remote_end, and XactLastCommitEnd.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), and apply_handle_stream_prepare().

◆ stream_cleanup_files()

static void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)
static

Definition at line 3340 of file worker.c.

3341 {
3342  char path[MAXPGPATH];
3343 
3344  /* Delete the changes file. */
3345  changes_filename(path, subid, xid);
3347 
3348  /* Delete the subxact file, if it exists. */
3349  subxact_filename(path, subid, xid);
3351 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:3319
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:359

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ stream_close_file()

static void stream_close_file ( void  )
static

Definition at line 3416 of file worker.c.

3417 {
3420  Assert(stream_fd != NULL);
3421 
3423 
3425  stream_fd = NULL;
3426 }

References Assert(), BufFileClose(), in_streamed_transaction, InvalidTransactionId, stream_fd, stream_xid, and TransactionIdIsValid.

Referenced by apply_handle_stream_stop().

◆ stream_open_file()

static void stream_open_file ( Oid  subid,
TransactionId  xid,
bool  first 
)
static

Definition at line 3367 of file worker.c.

3368 {
3369  char path[MAXPGPATH];
3370  MemoryContext oldcxt;
3371 
3373  Assert(OidIsValid(subid));
3375  Assert(stream_fd == NULL);
3376 
3377 
3378  changes_filename(path, subid, xid);
3379  elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
3380 
3381  /*
3382  * Create/open the buffiles under the logical streaming context so that we
3383  * have those files until stream stop.
3384  */
3386 
3387  /*
3388  * If this is the first streamed segment, create the changes file.
3389  * Otherwise, just open the file for writing, in append mode.
3390  */
3391  if (first_segment)
3393  path);
3394  else
3395  {
3396  /*
3397  * Open the file and seek to the end of the file because we always
3398  * append the changes file.
3399  */
3401  path, O_RDWR, false);
3402  BufFileSeek(stream_fd, 0, 0, SEEK_END);
3403  }
3404 
3405  MemoryContextSwitchTo(oldcxt);
3406 }
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
Definition: buffile.c:689
BufFile * BufFileCreateFileSet(FileSet *fileset, const char *name)
Definition: buffile.c:262

References Assert(), BufFileCreateFileSet(), BufFileOpenFileSet(), BufFileSeek(), changes_filename(), DEBUG1, elog, in_streamed_transaction, LogicalStreamingContext, MAXPGPATH, MemoryContextSwitchTo(), MyLogicalRepWorker, OidIsValid, stream_fd, LogicalRepWorker::stream_fileset, and TransactionIdIsValid.

Referenced by apply_handle_stream_start().

◆ stream_write_change()

static void stream_write_change ( char  action,
StringInfo  s 
)
static

Definition at line 3437 of file worker.c.

3438 {
3439  int len;
3440 
3443  Assert(stream_fd != NULL);
3444 
3445  /* total on-disk size, including the action type character */
3446  len = (s->len - s->cursor) + sizeof(char);
3447 
3448  /* first write the size */
3449  BufFileWrite(stream_fd, &len, sizeof(len));
3450 
3451  /* then the action */
3452  BufFileWrite(stream_fd, &action, sizeof(action));
3453 
3454  /* and finally the remaining part of the buffer (after the XID) */
3455  len = (s->len - s->cursor);
3456 
3457  BufFileWrite(stream_fd, &s->data[s->cursor], len);
3458 }
void BufFileWrite(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:625

References generate_unaccent_rules::action, Assert(), BufFileWrite(), StringInfoData::cursor, StringInfoData::data, in_streamed_transaction, StringInfoData::len, len, stream_fd, stream_xid, and TransactionIdIsValid.

Referenced by handle_streamed_transaction().

◆ subscription_change_cb()

static void subscription_change_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 3117 of file worker.c.

3118 {
3119  MySubscriptionValid = false;
3120 }

References MySubscriptionValid.

Referenced by ApplyWorkerMain().

◆ subxact_filename()

static void subxact_filename ( char *  path,
Oid  subid,
TransactionId  xid 
)
inlinestatic

Definition at line 3319 of file worker.c.

3320 {
3321  snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
3322 }

References MAXPGPATH, and snprintf.

Referenced by stream_cleanup_files(), subxact_info_read(), and subxact_info_write().

◆ subxact_info_add()

static void subxact_info_add ( TransactionId  xid)
static

Definition at line 3241 of file worker.c.

3242 {
3243  SubXactInfo *subxacts = subxact_data.subxacts;
3244  int64 i;
3245