PostgreSQL Source Code  git master
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "miscadmin.h"
#include "replication/logicalrelation.h"
#include "storage/buffile.h"
#include "storage/fileset.h"
#include "storage/lock.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 
struct  ParallelApplyWorkerShared
 
struct  ParallelApplyWorkerInfo
 

Macros

#define isParallelApplyWorker(worker)   ((worker)->leader_pid != InvalidPid)
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 
typedef enum ParallelTransState ParallelTransState
 
typedef enum PartialFileSetState PartialFileSetState
 
typedef struct ParallelApplyWorkerShared ParallelApplyWorkerShared
 
typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
 

Enumerations

enum  ParallelTransState { PARALLEL_TRANS_UNKNOWN , PARALLEL_TRANS_STARTED , PARALLEL_TRANS_FINISHED }
 
enum  PartialFileSetState { FS_EMPTY , FS_SERIALIZE_IN_PROGRESS , FS_SERIALIZE_DONE , FS_READY }
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running)
 
bool logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
void stream_stop_internal (TransactionId xid)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
void apply_dispatch (StringInfo s)
 
void maybe_reread_subscription (void)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void InitializeApplyWorker (void)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
void apply_error_callback (void *arg)
 
void set_apply_error_context_origin (char *originname)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
void pa_detach_all_error_mq (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
static bool am_tablesync_worker (void)
 
static bool am_leader_apply_worker (void)
 
static bool am_parallel_apply_worker (void)
 

Variables

PGDLLIMPORT MemoryContext ApplyContext
 
PGDLLIMPORT MemoryContext ApplyMessageContext
 
PGDLLIMPORT ErrorContextCallbackapply_error_context_stack
 
PGDLLIMPORT ParallelApplyWorkerSharedMyParallelShared
 
PGDLLIMPORT struct WalReceiverConnLogRepWorkerWalRcvConn
 
PGDLLIMPORT SubscriptionMySubscription
 
PGDLLIMPORT LogicalRepWorkerMyLogicalRepWorker
 
PGDLLIMPORT bool in_remote_transaction
 
PGDLLIMPORT bool InitializingApplyWorker
 

Macro Definition Documentation

◆ isParallelApplyWorker

#define isParallelApplyWorker (   worker)    ((worker)->leader_pid != InvalidPid)

Definition at line 308 of file worker_internal.h.

Typedef Documentation

◆ LogicalRepWorker

◆ ParallelApplyWorkerInfo

◆ ParallelApplyWorkerShared

◆ ParallelTransState

◆ PartialFileSetState

Enumeration Type Documentation

◆ ParallelTransState

Enumerator
PARALLEL_TRANS_UNKNOWN 
PARALLEL_TRANS_STARTED 
PARALLEL_TRANS_FINISHED 

Definition at line 93 of file worker_internal.h.

94 {
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED

◆ PartialFileSetState

Enumerator
FS_EMPTY 
FS_SERIALIZE_IN_PROGRESS 
FS_SERIALIZE_DONE 
FS_READY 

Definition at line 116 of file worker_internal.h.

117 {
118  FS_EMPTY,
121  FS_READY
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1580 of file tablesync.c.

1581 {
1582  bool started_tx = false;
1583  bool has_subrels = false;
1584 
1585  /* We need up-to-date sync state info for subscription tables here. */
1586  has_subrels = FetchTableStates(&started_tx);
1587 
1588  if (started_tx)
1589  {
1591  pgstat_report_stat(true);
1592  }
1593 
1594  /*
1595  * Return false when there are no tables in subscription or not all tables
1596  * are in ready state; true otherwise.
1597  */
1598  return has_subrels && (table_states_not_ready == NIL);
1599 }
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:582
static List * table_states_not_ready
Definition: tablesync.c:125
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1519
void CommitTransactionCommand(void)
Definition: xact.c:3034

References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by ApplyWorkerMain(), pa_can_start(), and process_syncing_tables_for_apply().

◆ am_leader_apply_worker()

static bool am_leader_apply_worker ( void  )
inlinestatic

Definition at line 317 of file worker_internal.h.

318 {
319  return (!am_tablesync_worker() &&
321 }
#define isParallelApplyWorker(worker)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
static bool am_tablesync_worker(void)

References am_tablesync_worker(), isParallelApplyWorker, and MyLogicalRepWorker.

Referenced by logicalrep_worker_detach(), pa_can_start(), pa_set_fileset_state(), and pa_xact_finish().

◆ am_parallel_apply_worker()

◆ am_tablesync_worker()

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3285 of file worker.c.

3286 {
3288  LogicalRepMsgType saved_command;
3289 
3290  /*
3291  * Set the current command being applied. Since this function can be
3292  * called recursively when applying spooled changes, save the current
3293  * command.
3294  */
3295  saved_command = apply_error_callback_arg.command;
3297 
3298  switch (action)
3299  {
3300  case LOGICAL_REP_MSG_BEGIN:
3301  apply_handle_begin(s);
3302  break;
3303 
3306  break;
3307 
3310  break;
3311 
3314  break;
3315 
3318  break;
3319 
3322  break;
3323 
3326  break;
3327 
3328  case LOGICAL_REP_MSG_TYPE:
3329  apply_handle_type(s);
3330  break;
3331 
3334  break;
3335 
3337 
3338  /*
3339  * Logical replication does not use generic logical messages yet.
3340  * Although, it could be used by other applications that use this
3341  * output plugin.
3342  */
3343  break;
3344 
3347  break;
3348 
3351  break;
3352 
3355  break;
3356 
3359  break;
3360 
3363  break;
3364 
3367  break;
3368 
3371  break;
3372 
3375  break;
3376 
3379  break;
3380 
3381  default:
3382  ereport(ERROR,
3383  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3384  errmsg("invalid logical replication message type \"%c\"", action)));
3385  }
3386 
3387  /* Reset the current command */
3388  apply_error_callback_arg.command = saved_command;
3389 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1295
static void apply_handle_type(StringInfo s)
Definition: worker.c:2350
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3157
static void apply_handle_update(StringInfo s)
Definition: worker.c:2546
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2157
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1193
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:295
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2730
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1015
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1040
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1836
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2327
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1132
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1242
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1650
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1432
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1066
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1491
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2397
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
LogicalRepMsgType command
Definition: worker.c:238

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4916 of file worker.c.

4917 {
4919 
4921  return;
4922 
4923  Assert(errarg->origin_name);
4924 
4925  if (errarg->rel == NULL)
4926  {
4927  if (!TransactionIdIsValid(errarg->remote_xid))
4928  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4929  errarg->origin_name,
4930  logicalrep_message_type(errarg->command));
4931  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4932  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4933  errarg->origin_name,
4935  errarg->remote_xid);
4936  else
4937  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4938  errarg->origin_name,
4940  errarg->remote_xid,
4941  LSN_FORMAT_ARGS(errarg->finish_lsn));
4942  }
4943  else
4944  {
4945  if (errarg->remote_attnum < 0)
4946  {
4947  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4948  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4949  errarg->origin_name,
4951  errarg->rel->remoterel.nspname,
4952  errarg->rel->remoterel.relname,
4953  errarg->remote_xid);
4954  else
4955  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4956  errarg->origin_name,
4958  errarg->rel->remoterel.nspname,
4959  errarg->rel->remoterel.relname,
4960  errarg->remote_xid,
4961  LSN_FORMAT_ARGS(errarg->finish_lsn));
4962  }
4963  else
4964  {
4965  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4966  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4967  errarg->origin_name,
4969  errarg->rel->remoterel.nspname,
4970  errarg->rel->remoterel.relname,
4971  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4972  errarg->remote_xid);
4973  else
4974  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4975  errarg->origin_name,
4977  errarg->rel->remoterel.nspname,
4978  errarg->rel->remoterel.relname,
4979  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4980  errarg->remote_xid,
4981  LSN_FORMAT_ARGS(errarg->finish_lsn));
4982  }
4983  }
4984 }
#define errcontext
Definition: elog.h:196
Assert(fmt[strlen(fmt) - 1] !='\n')
char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:243
XLogRecPtr finish_lsn
Definition: worker.c:244
LogicalRepRelMapEntry * rel
Definition: worker.c:239
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2025 of file worker.c.

2027 {
2029  int nchanges;
2030  char path[MAXPGPATH];
2031  char *buffer = NULL;
2032  MemoryContext oldcxt;
2033  ResourceOwner oldowner;
2034  int fileno;
2035  off_t offset;
2036 
2037  if (!am_parallel_apply_worker())
2039 
2040  /* Make sure we have an open transaction */
2042 
2043  /*
2044  * Allocate file handle and memory required to process all the messages in
2045  * TopTransactionContext to avoid them getting reset after each message is
2046  * processed.
2047  */
2049 
2050  /* Open the spool file for the committed/prepared transaction */
2052  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2053 
2054  /*
2055  * Make sure the file is owned by the toplevel transaction so that the
2056  * file will not be accidentally closed when aborting a subtransaction.
2057  */
2058  oldowner = CurrentResourceOwner;
2060 
2061  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2062 
2063  CurrentResourceOwner = oldowner;
2064 
2065  buffer = palloc(BLCKSZ);
2066  initStringInfo(&s2);
2067 
2068  MemoryContextSwitchTo(oldcxt);
2069 
2070  remote_final_lsn = lsn;
2071 
2072  /*
2073  * Make sure the handle apply_dispatch methods are aware we're in a remote
2074  * transaction.
2075  */
2076  in_remote_transaction = true;
2078 
2080 
2081  /*
2082  * Read the entries one by one and pass them through the same logic as in
2083  * apply_dispatch.
2084  */
2085  nchanges = 0;
2086  while (true)
2087  {
2088  size_t nbytes;
2089  int len;
2090 
2092 
2093  /* read length of the on-disk record */
2094  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2095 
2096  /* have we reached end of the file? */
2097  if (nbytes == 0)
2098  break;
2099 
2100  /* do we have a correct length? */
2101  if (len <= 0)
2102  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2103  len, path);
2104 
2105  /* make sure we have sufficiently large buffer */
2106  buffer = repalloc(buffer, len);
2107 
2108  /* and finally read the data into the buffer */
2109  BufFileReadExact(stream_fd, buffer, len);
2110 
2111  BufFileTell(stream_fd, &fileno, &offset);
2112 
2113  /* copy the buffer to the stringinfo and call apply_dispatch */
2114  resetStringInfo(&s2);
2115  appendBinaryStringInfo(&s2, buffer, len);
2116 
2117  /* Ensure we are reading the data into our memory context. */
2119 
2120  apply_dispatch(&s2);
2121 
2123 
2124  MemoryContextSwitchTo(oldcxt);
2125 
2126  nchanges++;
2127 
2128  /*
2129  * It is possible the file has been closed because we have processed
2130  * the transaction end message like stream_commit in which case that
2131  * must be the last message.
2132  */
2133  if (!stream_fd)
2134  {
2135  ensure_last_message(stream_fileset, xid, fileno, offset);
2136  break;
2137  }
2138 
2139  if (nchanges % 1000 == 0)
2140  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2141  nchanges, path);
2142  }
2143 
2144  if (stream_fd)
2146 
2147  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2148  nchanges, path);
2149 
2150  return;
2151 }
static void begin_replication_step(void)
Definition: worker.c:532
static void end_replication_step(void)
Definition: worker.c:555
MemoryContext ApplyMessageContext
Definition: worker.c:307
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4185
static BufFile * stream_fd
Definition: worker.c:356
bool in_remote_transaction
Definition: worker.c:320
void apply_dispatch(StringInfo s)
Definition: worker.c:3285
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1993
static XLogRecPtr remote_final_lsn
Definition: worker.c:321
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4786
static void stream_close_file(void)
Definition: worker.c:4268
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
#define DEBUG1
Definition: elog.h:30
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1476
void * palloc(Size size)
Definition: mcxt.c:1226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:149
ResourceOwner CurrentResourceOwner
Definition: resowner.c:147
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), appendBinaryStringInfo(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog(), end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), resetStringInfo(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ InitializeApplyWorker()

void InitializeApplyWorker ( void  )

Definition at line 4445 of file worker.c.

4446 {
4447  MemoryContext oldctx;
4448 
4449  /* Run as replica session replication role. */
4450  SetConfigOption("session_replication_role", "replica",
4452 
4453  /* Connect to our database. */
4456  0);
4457 
4458  /*
4459  * Set always-secure search path, so malicious users can't redirect user
4460  * code (e.g. pg_index.indexprs).
4461  */
4462  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4463 
4464  /* Load the subscription into persistent memory context. */
4466  "ApplyContext",
4470 
4472  if (!MySubscription)
4473  {
4474  ereport(LOG,
4475  /* translator: %s is the name of logical replication worker */
4476  (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
4478 
4479  /* Ensure we remove no-longer-useful entry for worker's start time */
4482  proc_exit(0);
4483  }
4484 
4485  MySubscriptionValid = true;
4486  MemoryContextSwitchTo(oldctx);
4487 
4488  if (!MySubscription->enabled)
4489  {
4490  ereport(LOG,
4491  /* translator: first %s is the name of logical replication worker */
4492  (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
4494 
4496  }
4497 
4498  /* Setup synchronous commit according to the user's wishes */
4499  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4501 
4502  /* Keep us informed about subscription changes. */
4505  (Datum) 0);
4506 
4507  if (am_tablesync_worker())
4508  ereport(LOG,
4509  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4512  else
4513  ereport(LOG,
4514  /* translator: first %s is the name of logical replication worker */
4515  (errmsg("%s for subscription \"%s\" has started",
4517 
4519 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:3985
static void apply_worker_exit(void)
Definition: worker.c:3841
MemoryContext ApplyContext
Definition: worker.c:308
static bool MySubscriptionValid
Definition: worker.c:316
Subscription * MySubscription
Definition: worker.c:315
static const char * get_worker_name(void)
Definition: worker.c:442
#define LOG
Definition: elog.h:31
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4176
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1052
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
Subscription * GetSubscription(Oid subid, bool missing_ok)
uintptr_t Datum
Definition: postgres.h:64
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5627
@ SUBSCRIPTIONOID
Definition: syscache.h:99
void StartTransactionCommand(void)
Definition: xact.c:2937

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_parallel_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), get_worker_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 272 of file tablesync.c.

273 {
274  table_states_valid = false;
275 }
static bool table_states_valid
Definition: tablesync.c:124

References table_states_valid.

Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)

Definition at line 618 of file launcher.c.

619 {
620  int slot_no;
621  uint16 generation;
622  LogicalRepWorker *worker;
623 
624  SpinLockAcquire(&winfo->shared->mutex);
625  generation = winfo->shared->logicalrep_worker_generation;
626  slot_no = winfo->shared->logicalrep_worker_slot_no;
627  SpinLockRelease(&winfo->shared->mutex);
628 
629  Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
630 
631  /*
632  * Detach from the error_mq_handle for the parallel apply worker before
633  * stopping it. This prevents the leader apply worker from trying to
634  * receive the message from the error queue that might already be detached
635  * by the parallel apply worker.
636  */
637  if (winfo->error_mq_handle)
638  {
640  winfo->error_mq_handle = NULL;
641  }
642 
643  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
644 
645  worker = &LogicalRepCtx->workers[slot_no];
646  Assert(isParallelApplyWorker(worker));
647 
648  /*
649  * Only stop the worker if the generation matches and the worker is alive.
650  */
651  if (worker->generation == generation && worker->proc)
652  logicalrep_worker_stop_internal(worker, SIGINT);
653 
654  LWLockRelease(LogicalRepWorkerLock);
655 }
unsigned short uint16
Definition: c.h:489
int max_logical_replication_workers
Definition: launcher.c:57
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:512
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:76
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:117
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:844
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:73
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared

References Assert(), ParallelApplyWorkerInfo::error_mq_handle, LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SpinLockAcquire, SpinLockRelease, and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 832 of file launcher.c.

833 {
834  int i;
835  int res = 0;
836 
837  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
838 
839  /* Search for attached worker for a given subscription id. */
840  for (i = 0; i < max_logical_replication_workers; i++)
841  {
843 
844  if (w->subid == subid && OidIsValid(w->relid))
845  res++;
846  }
847 
848  return res;
849 }
int i
Definition: isn.c:73
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1919

References Assert(), i, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 692 of file launcher.c.

693 {
694  /* Block concurrent access. */
695  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
696 
697  Assert(slot >= 0 && slot < max_logical_replication_workers);
699 
701  {
702  LWLockRelease(LogicalRepWorkerLock);
703  ereport(ERROR,
704  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
705  errmsg("logical replication worker slot %d is empty, cannot attach",
706  slot)));
707  }
708 
710  {
711  LWLockRelease(LogicalRepWorkerLock);
712  ereport(ERROR,
713  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
714  errmsg("logical replication worker slot %d is already used by "
715  "another worker, cannot attach", slot)));
716  }
717 
720 
721  LWLockRelease(LogicalRepWorkerLock);
722 }
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:802
@ LW_EXCLUSIVE
Definition: lwlock.h:116
PGPROC * MyProc
Definition: proc.c:66

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().

◆ logicalrep_worker_find()

LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 249 of file launcher.c.

250 {
251  int i;
252  LogicalRepWorker *res = NULL;
253 
254  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
255 
256  /* Search for attached worker for a given subscription id. */
257  for (i = 0; i < max_logical_replication_workers; i++)
258  {
260 
261  /* Skip parallel apply workers. */
262  if (isParallelApplyWorker(w))
263  continue;
264 
265  if (w->in_use && w->subid == subid && w->relid == relid &&
266  (!only_running || w->proc))
267  {
268  res = w;
269  break;
270  }
271  }
272 
273  return res;
274 }

References Assert(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm 
)

Definition at line 306 of file launcher.c.

308 {
309  BackgroundWorker bgw;
310  BackgroundWorkerHandle *bgw_handle;
311  uint16 generation;
312  int i;
313  int slot = 0;
314  LogicalRepWorker *worker = NULL;
315  int nsyncworkers;
316  int nparallelapplyworkers;
318  bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
319 
320  /* Sanity check - tablesync worker cannot be a subworker */
321  Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
322 
323  ereport(DEBUG1,
324  (errmsg_internal("starting logical replication worker for subscription \"%s\"",
325  subname)));
326 
327  /* Report this after the initial starting message for consistency. */
328  if (max_replication_slots == 0)
329  ereport(ERROR,
330  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
331  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
332 
333  /*
334  * We need to do the modification of the shared memory under lock so that
335  * we have consistent view.
336  */
337  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
338 
339 retry:
340  /* Find unused worker slot. */
341  for (i = 0; i < max_logical_replication_workers; i++)
342  {
344 
345  if (!w->in_use)
346  {
347  worker = w;
348  slot = i;
349  break;
350  }
351  }
352 
353  nsyncworkers = logicalrep_sync_worker_count(subid);
354 
356 
357  /*
358  * If we didn't find a free slot, try to do garbage collection. The
359  * reason we do this is because if some worker failed to start up and its
360  * parent has crashed while waiting, the in_use state was never cleared.
361  */
362  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
363  {
364  bool did_cleanup = false;
365 
366  for (i = 0; i < max_logical_replication_workers; i++)
367  {
369 
370  /*
371  * If the worker was marked in use but didn't manage to attach in
372  * time, clean it up.
373  */
374  if (w->in_use && !w->proc &&
377  {
378  elog(WARNING,
379  "logical replication worker for subscription %u took too long to start; canceled",
380  w->subid);
381 
383  did_cleanup = true;
384  }
385  }
386 
387  if (did_cleanup)
388  goto retry;
389  }
390 
391  /*
392  * We don't allow to invoke more sync workers once we have reached the
393  * sync worker limit per subscription. So, just return silently as we
394  * might get here because of an otherwise harmless race condition.
395  */
396  if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
397  {
398  LWLockRelease(LogicalRepWorkerLock);
399  return false;
400  }
401 
402  nparallelapplyworkers = logicalrep_pa_worker_count(subid);
403 
404  /*
405  * Return false if the number of parallel apply workers reached the limit
406  * per subscription.
407  */
408  if (is_parallel_apply_worker &&
409  nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
410  {
411  LWLockRelease(LogicalRepWorkerLock);
412  return false;
413  }
414 
415  /*
416  * However if there are no more free worker slots, inform user about it
417  * before exiting.
418  */
419  if (worker == NULL)
420  {
421  LWLockRelease(LogicalRepWorkerLock);
423  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
424  errmsg("out of logical replication worker slots"),
425  errhint("You might need to increase max_logical_replication_workers.")));
426  return false;
427  }
428 
429  /* Prepare the worker slot. */
430  worker->launch_time = now;
431  worker->in_use = true;
432  worker->generation++;
433  worker->proc = NULL;
434  worker->dbid = dbid;
435  worker->userid = userid;
436  worker->subid = subid;
437  worker->relid = relid;
438  worker->relstate = SUBREL_STATE_UNKNOWN;
440  worker->stream_fileset = NULL;
441  worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
442  worker->parallel_apply = is_parallel_apply_worker;
443  worker->last_lsn = InvalidXLogRecPtr;
446  worker->reply_lsn = InvalidXLogRecPtr;
447  TIMESTAMP_NOBEGIN(worker->reply_time);
448 
449  /* Before releasing lock, remember generation for future identification. */
450  generation = worker->generation;
451 
452  LWLockRelease(LogicalRepWorkerLock);
453 
454  /* Register the new dynamic worker. */
455  memset(&bgw, 0, sizeof(bgw));
459  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
460 
461  if (is_parallel_apply_worker)
462  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
463  else
464  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
465 
466  if (OidIsValid(relid))
468  "logical replication worker for subscription %u sync %u", subid, relid);
469  else if (is_parallel_apply_worker)
471  "logical replication parallel apply worker for subscription %u", subid);
472  else
474  "logical replication apply worker for subscription %u", subid);
475 
476  if (is_parallel_apply_worker)
477  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
478  else
479  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
480 
483  bgw.bgw_main_arg = Int32GetDatum(slot);
484 
485  if (is_parallel_apply_worker)
486  memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
487 
488  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
489  {
490  /* Failed to start worker, so clean up the worker slot. */
491  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
492  Assert(generation == worker->generation);
494  LWLockRelease(LogicalRepWorkerLock);
495 
497  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
498  errmsg("out of background worker slots"),
499  errhint("You might need to increase max_worker_processes.")));
500  return false;
501  }
502 
503  /* Now wait until it attaches. */
504  return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
505 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:959
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:158
uint32 dsm_handle
Definition: dsm_impl.h:55
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define WARNING
Definition: elog.h:36
int MyProcPid
Definition: globals.c:44
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:856
int max_sync_workers_per_subscription
Definition: launcher.c:58
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:189
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:832
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:59
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:771
#define InvalidPid
Definition: miscadmin.h:32
NameData subname
#define snprintf
Definition: port.h:238
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
int max_replication_slots
Definition: slot.c:102
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:96
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
int wal_receiver_timeout
Definition: walreceiver.c:91
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog(), ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), OidIsValid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), pa_launch_parallel_worker(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 594 of file launcher.c.

595 {
596  LogicalRepWorker *worker;
597 
598  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
599 
600  worker = logicalrep_worker_find(subid, relid, false);
601 
602  if (worker)
603  {
604  Assert(!isParallelApplyWorker(worker));
605  logicalrep_worker_stop_internal(worker, SIGTERM);
606  }
607 
608  LWLockRelease(LogicalRepWorkerLock);
609 }
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:249

References Assert(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 661 of file launcher.c.

662 {
663  LogicalRepWorker *worker;
664 
665  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
666 
667  worker = logicalrep_worker_find(subid, relid, true);
668 
669  if (worker)
671 
672  LWLockRelease(LogicalRepWorkerLock);
673 }
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:681

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by apply_handle_stream_start(), and pg_attribute_noreturn().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 681 of file launcher.c.

682 {
683  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
684 
685  SetLatch(&worker->proc->procLatch);
686 }
void SetLatch(Latch *latch)
Definition: latch.c:607
Latch procLatch
Definition: proc.h:170

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by AtEOXact_LogicalRepWorkers(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List* logicalrep_workers_find ( Oid  subid,
bool  only_running 
)

Definition at line 281 of file launcher.c.

282 {
283  int i;
284  List *res = NIL;
285 
286  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
287 
288  /* Search for attached worker for a given subscription id. */
289  for (i = 0; i < max_logical_replication_workers; i++)
290  {
292 
293  if (w->in_use && w->subid == subid && (!only_running || w->proc))
294  res = lappend(res, w);
295  }
296 
297  return res;
298 }
List * lappend(List *list, void *datum)
Definition: list.c:338
Definition: pg_list.h:54

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ LogicalRepSyncTableStart()

char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 1244 of file tablesync.c.

1245 {
1246  char *slotname;
1247  char *err;
1248  char relstate;
1249  XLogRecPtr relstate_lsn;
1250  Relation rel;
1251  AclResult aclresult;
1253  char originname[NAMEDATALEN];
1254  RepOriginId originid;
1255  bool must_use_password;
1256 
1257  /* Check the state of the table synchronization. */
1261  &relstate_lsn);
1263 
1265  MyLogicalRepWorker->relstate = relstate;
1266  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1268 
1269  /*
1270  * If synchronization is already done or no longer necessary, exit now
1271  * that we've updated shared memory state.
1272  */
1273  switch (relstate)
1274  {
1275  case SUBREL_STATE_SYNCDONE:
1276  case SUBREL_STATE_READY:
1277  case SUBREL_STATE_UNKNOWN:
1278  finish_sync_worker(); /* doesn't return */
1279  }
1280 
1281  /* Calculate the name of the tablesync slot. */
1282  slotname = (char *) palloc(NAMEDATALEN);
1285  slotname,
1286  NAMEDATALEN);
1287 
1288  /* Is the use of a password mandatory? */
1289  must_use_password = MySubscription->passwordrequired &&
1291 
1292  /*
1293  * Here we use the slot name instead of the subscription name as the
1294  * application_name, so that it is different from the leader apply worker,
1295  * so that synchronous replication can distinguish them.
1296  */
1299  must_use_password,
1300  slotname, &err);
1301  if (LogRepWorkerWalRcvConn == NULL)
1302  ereport(ERROR,
1303  (errcode(ERRCODE_CONNECTION_FAILURE),
1304  errmsg("could not connect to the publisher: %s", err)));
1305 
1306  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1307  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1308  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1309 
1310  /* Assign the origin tracking record name. */
1313  originname,
1314  sizeof(originname));
1315 
1316  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1317  {
1318  /*
1319  * We have previously errored out before finishing the copy so the
1320  * replication slot might exist. We want to remove the slot if it
1321  * already exists and proceed.
1322  *
1323  * XXX We could also instead try to drop the slot, last time we failed
1324  * but for that, we might need to clean up the copy state as it might
1325  * be in the middle of fetching the rows. Also, if there is a network
1326  * breakdown then it wouldn't have succeeded so trying it next time
1327  * seems like a better bet.
1328  */
1330  }
1331  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1332  {
1333  /*
1334  * The COPY phase was previously done, but tablesync then crashed
1335  * before it was able to finish normally.
1336  */
1338 
1339  /*
1340  * The origin tracking name must already exist. It was created first
1341  * time this tablesync was launched.
1342  */
1343  originid = replorigin_by_name(originname, false);
1344  replorigin_session_setup(originid, 0);
1345  replorigin_session_origin = originid;
1346  *origin_startpos = replorigin_session_get_progress(false);
1347 
1349 
1350  goto copy_table_done;
1351  }
1352 
1354  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1357 
1358  /* Update the state and make it visible to others. */
1365  pgstat_report_stat(true);
1366 
1368 
1369  /*
1370  * Use a standard write lock here. It might be better to disallow access
1371  * to the table while it's being synchronized. But we don't want to block
1372  * the main apply process from working and it has to open the relation in
1373  * RowExclusiveLock when remapping remote relation id to local one.
1374  */
1376 
1377  /*
1378  * Check that our table sync worker has permission to insert into the
1379  * target table.
1380  */
1381  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1382  ACL_INSERT);
1383  if (aclresult != ACLCHECK_OK)
1384  aclcheck_error(aclresult,
1385  get_relkind_objtype(rel->rd_rel->relkind),
1387 
1388  /*
1389  * COPY FROM does not honor RLS policies. That is not a problem for
1390  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1391  * who has it implicitly), but other roles should not be able to
1392  * circumvent RLS. Disallow logical replication into RLS enabled
1393  * relations for such roles.
1394  */
1396  ereport(ERROR,
1397  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1398  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1399  GetUserNameFromId(GetUserId(), true),
1400  RelationGetRelationName(rel))));
1401 
1402  /*
1403  * Start a transaction in the remote node in REPEATABLE READ mode. This
1404  * ensures that both the replication slot we create (see below) and the
1405  * COPY are consistent with each other.
1406  */
1408  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1409  0, NULL);
1410  if (res->status != WALRCV_OK_COMMAND)
1411  ereport(ERROR,
1412  (errcode(ERRCODE_CONNECTION_FAILURE),
1413  errmsg("table copy could not start transaction on publisher: %s",
1414  res->err)));
1416 
1417  /*
1418  * Create a new permanent logical decoding slot. This slot will be used
1419  * for the catchup phase after COPY is done, so tell it to use the
1420  * snapshot to make the final data consistent.
1421  */
1423  slotname, false /* permanent */ , false /* two_phase */ ,
1424  CRS_USE_SNAPSHOT, origin_startpos);
1425 
1426  /*
1427  * Setup replication origin tracking. The purpose of doing this before the
1428  * copy is to avoid doing the copy again due to any error in setting up
1429  * origin tracking.
1430  */
1431  originid = replorigin_by_name(originname, true);
1432  if (!OidIsValid(originid))
1433  {
1434  /*
1435  * Origin tracking does not exist, so create it now.
1436  *
1437  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1438  * logged for the purpose of recovery. Locks are to prevent the
1439  * replication origin from vanishing while advancing.
1440  */
1441  originid = replorigin_create(originname);
1442 
1443  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1444  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1445  true /* go backward */ , true /* WAL log */ );
1446  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1447 
1448  replorigin_session_setup(originid, 0);
1449  replorigin_session_origin = originid;
1450  }
1451  else
1452  {
1453  ereport(ERROR,
1455  errmsg("replication origin \"%s\" already exists",
1456  originname)));
1457  }
1458 
1459  /* Now do the initial data copy */
1461  copy_table(rel);
1463 
1464  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1465  if (res->status != WALRCV_OK_COMMAND)
1466  ereport(ERROR,
1467  (errcode(ERRCODE_CONNECTION_FAILURE),
1468  errmsg("table copy could not finish transaction on publisher: %s",
1469  res->err)));
1471 
1472  table_close(rel, NoLock);
1473 
1474  /* Make the copy visible. */
1476 
1477  /*
1478  * Update the persisted state to indicate the COPY phase is done; make it
1479  * visible to others.
1480  */
1483  SUBREL_STATE_FINISHEDCOPY,
1485 
1487 
1488 copy_table_done:
1489 
1490  elog(DEBUG1,
1491  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1492  originname, LSN_FORMAT_ARGS(*origin_startpos));
1493 
1494  /*
1495  * We are done with the initial data synchronization, update the state.
1496  */
1498  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1499  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1501 
1502  /*
1503  * Finally, wait until the leader apply worker tells us to catch up and
1504  * then return to let LogicalRepApplyLoop do it.
1505  */
1506  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1507  return slotname;
1508 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2673
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:3923
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:461
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:313
void err(int eval, const char *fmt,...)
Definition: err.c:43
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:984
Oid GetUserId(void)
Definition: miscinit.c:510
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1234
#define ACL_INSERT
Definition: parsenodes.h:83
#define NAMEDATALEN
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:504
#define RelationGetRelationName(relation)
Definition: rel.h:538
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:251
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:683
void PopActiveSnapshot(void)
Definition: snapmgr.c:778
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Form_pg_class rd_rel
Definition: rel.h:111
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:223
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1228
static void copy_table(Relation rel)
Definition: tablesync.c:1084
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:432
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:442
#define walrcv_connect(conninfo, logical, must_use_password, appname, err)
Definition: walreceiver.h:410
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:436
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1078
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog(), ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, Subscription::owner, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RLS_ENABLED, RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, superuser_arg(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3872 of file worker.c.

3873 {
3874  MemoryContext oldctx;
3876  bool started_tx = false;
3877 
3878  /* When cache state is valid there is nothing to do here. */
3879  if (MySubscriptionValid)
3880  return;
3881 
3882  /* This function might be called inside or outside of transaction. */
3883  if (!IsTransactionState())
3884  {
3886  started_tx = true;
3887  }
3888 
3889  /* Ensure allocations in permanent context. */
3891 
3893 
3894  /*
3895  * Exit if the subscription was removed. This normally should not happen
3896  * as the worker gets killed during DROP SUBSCRIPTION.
3897  */
3898  if (!newsub)
3899  {
3900  ereport(LOG,
3901  /* translator: first %s is the name of logical replication worker */
3902  (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
3904 
3905  /* Ensure we remove no-longer-useful entry for worker's start time */
3908  proc_exit(0);
3909  }
3910 
3911  /* Exit if the subscription was disabled. */
3912  if (!newsub->enabled)
3913  {
3914  ereport(LOG,
3915  /* translator: first %s is the name of logical replication worker */
3916  (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
3918 
3920  }
3921 
3922  /* !slotname should never happen when enabled is true. */
3923  Assert(newsub->slotname);
3924 
3925  /* two-phase should not be altered */
3926  Assert(newsub->twophasestate == MySubscription->twophasestate);
3927 
3928  /*
3929  * Exit if any parameter that affects the remote connection was changed.
3930  * The launcher will start a new worker but note that the parallel apply
3931  * worker won't restart if the streaming option's value is changed from
3932  * 'parallel' to any other value or the server decides not to stream the
3933  * in-progress transaction.
3934  */
3935  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3936  strcmp(newsub->name, MySubscription->name) != 0 ||
3937  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3938  newsub->binary != MySubscription->binary ||
3939  newsub->stream != MySubscription->stream ||
3940  newsub->passwordrequired != MySubscription->passwordrequired ||
3941  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3942  newsub->owner != MySubscription->owner ||
3943  !equal(newsub->publications, MySubscription->publications))
3944  {
3946  ereport(LOG,
3947  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3948  MySubscription->name)));
3949  else
3950  ereport(LOG,
3951  /* translator: first %s is the name of logical replication worker */
3952  (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
3954 
3956  }
3957 
3958  /* Check for other changes that should never happen too. */
3959  if (newsub->dbid != MySubscription->dbid)
3960  {
3961  elog(ERROR, "subscription %u changed unexpectedly",
3963  }
3964 
3965  /* Clean old subscription info and switch to new one. */
3968 
3969  MemoryContextSwitchTo(oldctx);
3970 
3971  /* Change synchronous commit according to the user's wishes */
3972  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3974 
3975  if (started_tx)
3977 
3978  MySubscriptionValid = true;
3979 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
bool IsTransactionState(void)
Definition: xact.c:378

References am_parallel_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog(), equal(), ereport, errmsg(), ERROR, FreeSubscription(), get_worker_name(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)

Definition at line 469 of file applyparallelworker.c.

470 {
471  bool found;
472  ParallelApplyWorkerInfo *winfo = NULL;
474 
475  if (!pa_can_start())
476  return;
477 
478  winfo = pa_launch_parallel_worker();
479  if (!winfo)
480  return;
481 
482  /* First time through, initialize parallel apply worker state hashtable. */
484  {
485  HASHCTL ctl;
486 
487  MemSet(&ctl, 0, sizeof(ctl));
488  ctl.keysize = sizeof(TransactionId);
489  ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
490  ctl.hcxt = ApplyContext;
491 
492  ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
493  16, &ctl,
495  }
496 
497  /* Create an entry for the requested transaction. */
498  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
499  if (found)
500  elog(ERROR, "hash table corrupted");
501 
502  /* Update the transaction information in shared memory. */
503  SpinLockAcquire(&winfo->shared->mutex);
505  winfo->shared->xid = xid;
506  SpinLockRelease(&winfo->shared->mutex);
507 
508  winfo->in_use = true;
509  winfo->serialize_changes = false;
510  entry->winfo = winfo;
511  entry->xid = xid;
512 }
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
static HTAB * ParallelApplyTxnHash
#define MemSet(start, val, len)
Definition: c.h:1004
uint32 TransactionId
Definition: c.h:636
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
ParallelApplyWorkerInfo * winfo
ParallelTransState xact_state

References ApplyContext, elog(), HASHCTL::entrysize, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, ParallelApplyWorkerInfo::in_use, HASHCTL::keysize, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, ParallelApplyWorkerEntry::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1594 of file applyparallelworker.c.

1595 {
1597 
1598  /*
1599  * It is only possible to not have any pending stream chunks when we are
1600  * applying spooled messages.
1601  */
1603  {
1605  return;
1606 
1607  elog(ERROR, "invalid pending streaming chunk 0");
1608  }
1609 
1611  {
1614  }
1615 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_has_spooled_message_pending()
ParallelApplyWorkerShared * MyParallelShared
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
#define AccessShareLock
Definition: lockdefs.h:36
pg_atomic_uint32 pending_stream_count

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog(), ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 622 of file applyparallelworker.c.

623 {
624  ListCell *lc;
625 
626  foreach(lc, ParallelApplyWorkerPool)
627  {
629 
630  if (winfo->error_mq_handle)
631  {
633  winfo->error_mq_handle = NULL;
634  }
635  }
636 }
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition: pg_list.h:172

References ParallelApplyWorkerInfo::error_mq_handle, lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo* pa_find_worker ( TransactionId  xid)

Definition at line 518 of file applyparallelworker.c.

519 {
520  bool found;
522 
523  if (!TransactionIdIsValid(xid))
524  return NULL;
525 
527  return NULL;
528 
529  /* Return the cached parallel apply worker if valid. */
531  return stream_apply_worker;
532 
533  /* Find an entry for the requested transaction. */
534  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
535  if (found)
536  {
537  /* The worker must not have exited. */
538  Assert(entry->winfo->in_use);
539  return entry->winfo;
540  }
541 
542  return NULL;
543 }
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition: hsearch.h:113

References Assert(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1405 of file applyparallelworker.c.

1406 {
1407  /*
1408  * We don't need to free this explicitly as the allocated memory will be
1409  * freed at the transaction end.
1410  */
1411  subxactlist = NIL;
1412 }
static List * subxactlist

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void *  data 
)

Definition at line 1149 of file applyparallelworker.c.

1150 {
1151  int rc;
1152  shm_mq_result result;
1153  TimestampTz startTime = 0;
1154 
1156  Assert(!winfo->serialize_changes);
1157 
1158  /*
1159  * We don't try to send data to parallel worker for 'immediate' mode. This
1160  * is primarily used for testing purposes.
1161  */
1163  return false;
1164 
1165 /*
1166  * This timeout is a bit arbitrary but testing revealed that it is sufficient
1167  * to send the message unless the parallel apply worker is waiting on some
1168  * lock or there is a serious resource crunch. See the comments atop this file
1169  * to know why we are using a non-blocking way to send the message.
1170  */
1171 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1172 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1173 
1174  for (;;)
1175  {
1176  result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1177 
1178  if (result == SHM_MQ_SUCCESS)
1179  return true;
1180  else if (result == SHM_MQ_DETACHED)
1181  ereport(ERROR,
1182  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1183  errmsg("could not send data to shared-memory queue")));
1184 
1185  Assert(result == SHM_MQ_WOULD_BLOCK);
1186 
1187  /* Wait before retrying. */
1188  rc = WaitLatch(MyLatch,
1192 
1193  if (rc & WL_LATCH_SET)
1194  {
1197  }
1198 
1199  if (startTime == 0)
1200  startTime = GetCurrentTimestamp();
1201  else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1203  return false;
1204  }
1205 }
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
#define unlikely(x)
Definition: c.h:295
struct Latch * MyLatch
Definition: globals.c:58
void ResetLatch(Latch *latch)
Definition: latch.c:699
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:492
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
const void * data
int logical_replication_mode
@ LOGICAL_REP_MODE_IMMEDIATE
Definition: reorderbuffer.h:28
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
shm_mq_handle * mq_handle
@ WAIT_EVENT_LOGICAL_APPLY_SEND_DATA
Definition: wait_event.h:109

References Assert(), CHECK_FOR_INTERRUPTS, data, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), LOGICAL_REP_MODE_IMMEDIATE, logical_replication_mode, ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WAIT_EVENT_LOGICAL_APPLY_SEND_DATA, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)

Definition at line 1337 of file applyparallelworker.c.

1338 {
1339  stream_apply_worker = winfo;
1340 }

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1365 of file applyparallelworker.c.

1366 {
1367  if (current_xid != top_xid &&
1368  !list_member_xid(subxactlist, current_xid))
1369  {
1370  MemoryContext oldctx;
1371  char spname[NAMEDATALEN];
1372 
1373  pa_savepoint_name(MySubscription->oid, current_xid,
1374  spname, sizeof(spname));
1375 
1376  elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1377 
1378  /* We must be in transaction block to define the SAVEPOINT. */
1379  if (!IsTransactionBlock())
1380  {
1381  if (!IsTransactionState())
1383 
1386  }
1387 
1388  DefineSavepoint(spname);
1389 
1390  /*
1391  * CommitTransactionCommand is needed to start a subtransaction after
1392  * issuing a SAVEPOINT inside a transaction block (see
1393  * StartSubTransaction()).
1394  */
1396 
1398  subxactlist = lappend_xid(subxactlist, current_xid);
1399  MemoryContextSwitchTo(oldctx);
1400  }
1401 }
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741
void DefineSavepoint(const char *name)
Definition: xact.c:4218
bool IsTransactionBlock(void)
Definition: xact.c:4815
void BeginTransactionBlock(void)
Definition: xact.c:3769

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog(), IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)

Definition at line 1419 of file applyparallelworker.c.

1420 {
1421  TransactionId xid = abort_data->xid;
1422  TransactionId subxid = abort_data->subxid;
1423 
1424  /*
1425  * Update origin state so we can restart streaming from correct position
1426  * in case of crash.
1427  */
1430 
1431  /*
1432  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1433  * just free the subxactlist.
1434  */
1435  if (subxid == xid)
1436  {
1438 
1439  /*
1440  * Release the lock as we might be processing an empty streaming
1441  * transaction in which case the lock won't be released during
1442  * transaction rollback.
1443  *
1444  * Note that it's ok to release the transaction lock before aborting
1445  * the transaction because even if the parallel apply worker dies due
1446  * to crash or some other reason, such a transaction would still be
1447  * considered aborted.
1448  */
1450 
1452 
1453  if (IsTransactionBlock())
1454  {
1455  EndTransactionBlock(false);
1457  }
1458 
1460 
1462  }
1463  else
1464  {
1465  /* OK, so it's a subxact. Rollback to the savepoint. */
1466  int i;
1467  char spname[NAMEDATALEN];
1468 
1469  pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1470 
1471  elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1472 
1473  /*
1474  * Search the subxactlist, determine the offset tracked for the
1475  * subxact, and truncate the list.
1476  *
1477  * Note that for an empty sub-transaction we won't find the subxid
1478  * here.
1479  */
1480  for (i = list_length(subxactlist) - 1; i >= 0; i--)
1481  {
1483 
1484  if (xid_tmp == subxid)
1485  {
1486  RollbackToSavepoint(spname);
1489  break;
1490  }
1491  }
1492  }
1493 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
@ STATE_IDLE
List * list_truncate(List *list, int new_size)
Definition: list.c:630
#define AccessExclusiveLock
Definition: lockdefs.h:43
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
static int list_length(const List *l)
Definition: pg_list.h:152
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
void RollbackToSavepoint(const char *name)
Definition: xact.c:4412
bool EndTransactionBlock(bool chain)
Definition: xact.c:3889
void AbortCurrentTransaction(void)
Definition: xact.c:3304

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog(), EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)

Definition at line 1214 of file applyparallelworker.c.

1216 {
1217  ereport(LOG,
1218  (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1219  winfo->shared->xid)));
1220 
1221  /*
1222  * The parallel apply worker could be stuck for some reason (say waiting
1223  * on some lock by other backend), so stop trying to send data directly to
1224  * it and start serializing data to the file instead.
1225  */
1226  winfo->serialize_changes = true;
1227 
1228  /* Initialize the stream fileset. */
1229  stream_start_internal(winfo->shared->xid, true);
1230 
1231  /*
1232  * Acquires the stream lock if not already to make sure that the parallel
1233  * apply worker will wait for the leader to release the stream lock until
1234  * the end of the transaction.
1235  */
1236  if (!stream_locked)
1238 
1240 }
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1453

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_unlock_transaction()

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1621 of file applyparallelworker.c.

1622 {
1624 
1625  /*
1626  * Unlock the shared object lock so that parallel apply worker can
1627  * continue to receive and apply changes.
1628  */
1630 
1631  /*
1632  * Wait for that worker to finish. This is necessary to maintain commit
1633  * order which avoids failures due to transaction dependencies and
1634  * deadlocks.
1635  */
1636  pa_wait_for_xact_finish(winfo);
1637 
1638  if (!XLogRecPtrIsInvalid(remote_lsn))
1639  store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1640 
1641  pa_free_worker(winfo);
1642 }
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3449

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 647 of file tablesync.c.

648 {
649  /*
650  * Skip for parallel apply workers because they only operate on tables
651  * that are in a READY state. See pa_can_start() and
652  * should_apply_changes_for_rel().
653  */
655  return;
656 
657  if (am_tablesync_worker())
658  process_syncing_tables_for_sync(current_lsn);
659  else
661 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:409
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:286

References am_parallel_apply_worker(), am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 461 of file worker.c.

463 {
464  if (OidIsValid(relid))
465  {
466  /* Replication origin name for tablesync workers. */
467  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
468  }
469  else
470  {
471  /* Replication origin name for non-tablesync workers. */
472  snprintf(originname, szoriginname, "pg_%u", suboid);
473  }
474 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), ApplyWorkerMain(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 5058 of file worker.c.

5059 {
5061  originname);
5062 }
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1631

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3449 of file worker.c.

3450 {
3451  FlushPosition *flushpos;
3452 
3453  /*
3454  * Skip for parallel apply workers, because the lsn_mapping is maintained
3455  * by the leader apply worker.
3456  */
3458  return;
3459 
3460  /* Need to do this in permanent context */
3462 
3463  /* Track commit lsn */
3464  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3465  flushpos->local_end = local_lsn;
3466  flushpos->remote_end = remote_lsn;
3467 
3468  dlist_push_tail(&lsn_mapping, &flushpos->node);
3470 }
static dlist_head lsn_mapping
Definition: worker.c:221
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:216
XLogRecPtr remote_end
Definition: worker.c:218
XLogRecPtr local_end
Definition: worker.c:217

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 4199 of file worker.c.

4200 {
4201  char path[MAXPGPATH];
4202 
4203  /* Delete the changes file. */
4204  changes_filename(path, subid, xid);
4206 
4207  /* Delete the subxact file, if it exists. */
4208  subxact_filename(path, subid, xid);
4210 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4178
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1453 of file worker.c.

1454 {
1456 
1457  /*
1458  * Initialize the worker's stream_fileset if we haven't yet. This will be
1459  * used for the entire duration of the worker so create it in a permanent
1460  * context. We create this on the very first streaming message from any
1461  * transaction and then use it for this and other streaming transactions.
1462  * Now, we could create a fileset at the start of the worker as well but
1463  * then we won't be sure that it will ever be used.
1464  */
1466  {
1467  MemoryContext oldctx;
1468 
1470 
1473 
1474  MemoryContextSwitchTo(oldctx);
1475  }
1476 
1477  /* Open the spool file for this transaction. */
1478  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1479 
1480  /* If this is not the first segment, open existing subxact file. */
1481  if (!first_segment)
1483 
1485 }
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4223
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4049
void FileSetInit(FileSet *fileset)
Definition: fileset.c:54

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1627 of file worker.c.

1628 {
1629  /*
1630  * Serialize information about subxacts for the toplevel transaction, then
1631  * close the stream messages spool file.
1632  */
1635 
1636  /* We must be in a valid transaction state */
1638 
1639  /* Commit the per-stream transaction */
1641 
1642  /* Reset per-stream context */
1644 }
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4000
static MemoryContext LogicalStreamingContext
Definition: worker.c:311

References Assert(), CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1605 of file tablesync.c.

1606 {
1607  Relation rel;
1608  HeapTuple tup;
1609  bool nulls[Natts_pg_subscription];
1610  bool replaces[Natts_pg_subscription];
1611  Datum values[Natts_pg_subscription];
1612 
1614  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1615  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1616 
1617  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1619  if (!HeapTupleIsValid(tup))
1620  elog(ERROR,
1621  "cache lookup failed for subscription oid %u",
1622  suboid);
1623 
1624  /* Form a new tuple. */
1625  memset(values, 0, sizeof(values));
1626  memset(nulls, false, sizeof(nulls));
1627  memset(replaces, false, sizeof(replaces));
1628 
1629  /* And update/set two_phase state */
1630  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1631  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1632 
1633  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1634  values, nulls, replaces);
1635  CatalogTupleUpdate(rel, &tup->t_self, tup);
1636 
1637  heap_freetuple(tup);
1639 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:182

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), and CreateSubscription().

Variable Documentation

◆ apply_error_context_stack

PGDLLIMPORT ErrorContextCallback* apply_error_context_stack
extern

Definition at line 305 of file worker.c.

Referenced by HandleParallelApplyMessage(), and LogicalRepApplyLoop().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ InitializingApplyWorker

PGDLLIMPORT bool InitializingApplyWorker
extern

Definition at line 335 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogRepWorkerWalRcvConn

◆ MyLogicalRepWorker

◆ MyParallelShared

◆ MySubscription