PostgreSQL Source Code  git master
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "miscadmin.h"
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "storage/buffile.h"
#include "storage/fileset.h"
#include "storage/lock.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 
struct  ParallelApplyWorkerShared
 
struct  ParallelApplyWorkerInfo
 

Macros

#define isParallelApplyWorker(worker)
 
#define isTablesyncWorker(worker)
 

Typedefs

typedef enum LogicalRepWorkerType LogicalRepWorkerType
 
typedef struct LogicalRepWorker LogicalRepWorker
 
typedef enum ParallelTransState ParallelTransState
 
typedef enum PartialFileSetState PartialFileSetState
 
typedef struct ParallelApplyWorkerShared ParallelApplyWorkerShared
 
typedef struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
 

Enumerations

enum  LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0 , WORKERTYPE_TABLESYNC , WORKERTYPE_APPLY , WORKERTYPE_PARALLEL_APPLY }
 
enum  ParallelTransState { PARALLEL_TRANS_UNKNOWN , PARALLEL_TRANS_STARTED , PARALLEL_TRANS_FINISHED }
 
enum  PartialFileSetState { FS_EMPTY , FS_SERIALIZE_IN_PROGRESS , FS_SERIALIZE_DONE , FS_READY }
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running)
 
bool logicalrep_worker_launch (LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForLogicalRep (Oid suboid, Oid relid, char *originname, Size szoriginname)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
void stream_start_internal (TransactionId xid, bool first_segment)
 
void stream_stop_internal (TransactionId xid)
 
void apply_spooled_messages (FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
 
void apply_dispatch (StringInfo s)
 
void maybe_reread_subscription (void)
 
void stream_cleanup_files (Oid subid, TransactionId xid)
 
void set_stream_options (WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
 
void start_apply (XLogRecPtr origin_startpos)
 
void InitializeLogRepWorker (void)
 
void SetupApplyOrSyncWorker (int worker_slot)
 
void DisableSubscriptionAndExit (void)
 
void store_flush_position (XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 
void apply_error_callback (void *arg)
 
void set_apply_error_context_origin (char *originname)
 
void pa_allocate_worker (TransactionId xid)
 
ParallelApplyWorkerInfopa_find_worker (TransactionId xid)
 
void pa_detach_all_error_mq (void)
 
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
 
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
 
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void pa_reset_subtrans (void)
 
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
 
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
 
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
 
void pa_decr_and_wait_stream_block (void)
 
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 
static bool am_tablesync_worker (void)
 
static bool am_leader_apply_worker (void)
 
static bool am_parallel_apply_worker (void)
 

Variables

PGDLLIMPORT MemoryContext ApplyContext
 
PGDLLIMPORT MemoryContext ApplyMessageContext
 
PGDLLIMPORT ErrorContextCallbackapply_error_context_stack
 
PGDLLIMPORT ParallelApplyWorkerSharedMyParallelShared
 
PGDLLIMPORT struct WalReceiverConnLogRepWorkerWalRcvConn
 
PGDLLIMPORT SubscriptionMySubscription
 
PGDLLIMPORT LogicalRepWorkerMyLogicalRepWorker
 
PGDLLIMPORT bool in_remote_transaction
 
PGDLLIMPORT bool InitializingApplyWorker
 

Macro Definition Documentation

◆ isParallelApplyWorker

#define isParallelApplyWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
@ WORKERTYPE_PARALLEL_APPLY

Definition at line 330 of file worker_internal.h.

◆ isTablesyncWorker

#define isTablesyncWorker (   worker)
Value:
((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
@ WORKERTYPE_TABLESYNC

Definition at line 332 of file worker_internal.h.

Typedef Documentation

◆ LogicalRepWorker

◆ LogicalRepWorkerType

◆ ParallelApplyWorkerInfo

◆ ParallelApplyWorkerShared

◆ ParallelTransState

◆ PartialFileSetState

Enumeration Type Documentation

◆ LogicalRepWorkerType

Enumerator
WORKERTYPE_UNKNOWN 
WORKERTYPE_TABLESYNC 
WORKERTYPE_APPLY 
WORKERTYPE_PARALLEL_APPLY 

Definition at line 31 of file worker_internal.h.

32 {
LogicalRepWorkerType
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_APPLY

◆ ParallelTransState

Enumerator
PARALLEL_TRANS_UNKNOWN 
PARALLEL_TRANS_STARTED 
PARALLEL_TRANS_FINISHED 

Definition at line 105 of file worker_internal.h.

106 {
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED

◆ PartialFileSetState

Enumerator
FS_EMPTY 
FS_SERIALIZE_IN_PROGRESS 
FS_SERIALIZE_DONE 
FS_READY 

Definition at line 128 of file worker_internal.h.

129 {
130  FS_EMPTY,
133  FS_READY,
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1703 of file tablesync.c.

1704 {
1705  bool started_tx = false;
1706  bool has_subrels = false;
1707 
1708  /* We need up-to-date sync state info for subscription tables here. */
1709  has_subrels = FetchTableStates(&started_tx);
1710 
1711  if (started_tx)
1712  {
1714  pgstat_report_stat(true);
1715  }
1716 
1717  /*
1718  * Return false when there are no tables in subscription or not all tables
1719  * are in ready state; true otherwise.
1720  */
1721  return has_subrels && (table_states_not_ready == NIL);
1722 }
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:582
static List * table_states_not_ready
Definition: tablesync.c:127
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1554
void CommitTransactionCommand(void)
Definition: xact.c:3034

References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), process_syncing_tables_for_apply(), and run_apply_worker().

◆ am_leader_apply_worker()

static bool am_leader_apply_worker ( void  )
inlinestatic

◆ am_parallel_apply_worker()

◆ am_tablesync_worker()

static bool am_tablesync_worker ( void  )
inlinestatic

◆ apply_dispatch()

void apply_dispatch ( StringInfo  s)

Definition at line 3290 of file worker.c.

3291 {
3293  LogicalRepMsgType saved_command;
3294 
3295  /*
3296  * Set the current command being applied. Since this function can be
3297  * called recursively when applying spooled changes, save the current
3298  * command.
3299  */
3300  saved_command = apply_error_callback_arg.command;
3302 
3303  switch (action)
3304  {
3305  case LOGICAL_REP_MSG_BEGIN:
3306  apply_handle_begin(s);
3307  break;
3308 
3311  break;
3312 
3315  break;
3316 
3319  break;
3320 
3323  break;
3324 
3327  break;
3328 
3331  break;
3332 
3333  case LOGICAL_REP_MSG_TYPE:
3334  apply_handle_type(s);
3335  break;
3336 
3339  break;
3340 
3342 
3343  /*
3344  * Logical replication does not use generic logical messages yet.
3345  * Although, it could be used by other applications that use this
3346  * output plugin.
3347  */
3348  break;
3349 
3352  break;
3353 
3356  break;
3357 
3360  break;
3361 
3364  break;
3365 
3368  break;
3369 
3372  break;
3373 
3376  break;
3377 
3380  break;
3381 
3384  break;
3385 
3386  default:
3387  ereport(ERROR,
3388  (errcode(ERRCODE_PROTOCOL_VIOLATION),
3389  errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3390  }
3391 
3392  /* Reset the current command */
3393  apply_error_callback_arg.command = saved_command;
3394 }
static void apply_handle_stream_prepare(StringInfo s)
Definition: worker.c:1289
static void apply_handle_type(StringInfo s)
Definition: worker.c:2342
static void apply_handle_truncate(StringInfo s)
Definition: worker.c:3162
static void apply_handle_update(StringInfo s)
Definition: worker.c:2538
static void apply_handle_stream_commit(StringInfo s)
Definition: worker.c:2149
static void apply_handle_commit_prepared(StringInfo s)
Definition: worker.c:1187
ApplyErrorCallbackArg apply_error_callback_arg
Definition: worker.c:296
static void apply_handle_delete(StringInfo s)
Definition: worker.c:2722
static void apply_handle_begin(StringInfo s)
Definition: worker.c:1009
static void apply_handle_commit(StringInfo s)
Definition: worker.c:1034
static void apply_handle_stream_abort(StringInfo s)
Definition: worker.c:1830
static void apply_handle_relation(StringInfo s)
Definition: worker.c:2319
static void apply_handle_prepare(StringInfo s)
Definition: worker.c:1126
static void apply_handle_rollback_prepared(StringInfo s)
Definition: worker.c:1236
static void apply_handle_stream_stop(StringInfo s)
Definition: worker.c:1644
static void apply_handle_origin(StringInfo s)
Definition: worker.c:1426
static void apply_handle_begin_prepare(StringInfo s)
Definition: worker.c:1060
static void apply_handle_stream_start(StringInfo s)
Definition: worker.c:1485
static void apply_handle_insert(StringInfo s)
Definition: worker.c:2389
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
LogicalRepMsgType command
Definition: worker.c:239

References generate_unaccent_rules::action, apply_error_callback_arg, apply_handle_begin(), apply_handle_begin_prepare(), apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_delete(), apply_handle_insert(), apply_handle_origin(), apply_handle_prepare(), apply_handle_relation(), apply_handle_rollback_prepared(), apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), apply_handle_truncate(), apply_handle_type(), apply_handle_update(), ApplyErrorCallbackArg::command, ereport, errcode(), errmsg(), ERROR, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and pq_getmsgbyte().

Referenced by apply_spooled_messages(), LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_error_callback()

void apply_error_callback ( void *  arg)

Definition at line 4911 of file worker.c.

4912 {
4914 
4916  return;
4917 
4918  Assert(errarg->origin_name);
4919 
4920  if (errarg->rel == NULL)
4921  {
4922  if (!TransactionIdIsValid(errarg->remote_xid))
4923  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4924  errarg->origin_name,
4925  logicalrep_message_type(errarg->command));
4926  else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4927  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4928  errarg->origin_name,
4930  errarg->remote_xid);
4931  else
4932  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4933  errarg->origin_name,
4935  errarg->remote_xid,
4936  LSN_FORMAT_ARGS(errarg->finish_lsn));
4937  }
4938  else
4939  {
4940  if (errarg->remote_attnum < 0)
4941  {
4942  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4943  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4944  errarg->origin_name,
4946  errarg->rel->remoterel.nspname,
4947  errarg->rel->remoterel.relname,
4948  errarg->remote_xid);
4949  else
4950  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4951  errarg->origin_name,
4953  errarg->rel->remoterel.nspname,
4954  errarg->rel->remoterel.relname,
4955  errarg->remote_xid,
4956  LSN_FORMAT_ARGS(errarg->finish_lsn));
4957  }
4958  else
4959  {
4960  if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4961  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4962  errarg->origin_name,
4964  errarg->rel->remoterel.nspname,
4965  errarg->rel->remoterel.relname,
4966  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4967  errarg->remote_xid);
4968  else
4969  errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4970  errarg->origin_name,
4972  errarg->rel->remoterel.nspname,
4973  errarg->rel->remoterel.relname,
4974  errarg->rel->remoterel.attnames[errarg->remote_attnum],
4975  errarg->remote_xid,
4976  LSN_FORMAT_ARGS(errarg->finish_lsn));
4977  }
4978  }
4979 }
#define errcontext
Definition: elog.h:196
const char * logicalrep_message_type(LogicalRepMsgType action)
Definition: proto.c:1217
TransactionId remote_xid
Definition: worker.c:244
XLogRecPtr finish_lsn
Definition: worker.c:245
LogicalRepRelMapEntry * rel
Definition: worker.c:240
LogicalRepRelation remoterel
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References apply_error_callback_arg, Assert(), LogicalRepRelation::attnames, ApplyErrorCallbackArg::command, errcontext, ApplyErrorCallbackArg::finish_lsn, logicalrep_message_type(), LSN_FORMAT_ARGS, LogicalRepRelation::nspname, ApplyErrorCallbackArg::origin_name, ApplyErrorCallbackArg::rel, LogicalRepRelation::relname, ApplyErrorCallbackArg::remote_attnum, ApplyErrorCallbackArg::remote_xid, LogicalRepRelMapEntry::remoterel, TransactionIdIsValid, and XLogRecPtrIsInvalid.

Referenced by LogicalParallelApplyLoop(), and LogicalRepApplyLoop().

◆ apply_spooled_messages()

void apply_spooled_messages ( FileSet stream_fileset,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 2019 of file worker.c.

2021 {
2022  int nchanges;
2023  char path[MAXPGPATH];
2024  char *buffer = NULL;
2025  MemoryContext oldcxt;
2026  ResourceOwner oldowner;
2027  int fileno;
2028  off_t offset;
2029 
2030  if (!am_parallel_apply_worker())
2032 
2033  /* Make sure we have an open transaction */
2035 
2036  /*
2037  * Allocate file handle and memory required to process all the messages in
2038  * TopTransactionContext to avoid them getting reset after each message is
2039  * processed.
2040  */
2042 
2043  /* Open the spool file for the committed/prepared transaction */
2045  elog(DEBUG1, "replaying changes from file \"%s\"", path);
2046 
2047  /*
2048  * Make sure the file is owned by the toplevel transaction so that the
2049  * file will not be accidentally closed when aborting a subtransaction.
2050  */
2051  oldowner = CurrentResourceOwner;
2053 
2054  stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2055 
2056  CurrentResourceOwner = oldowner;
2057 
2058  buffer = palloc(BLCKSZ);
2059 
2060  MemoryContextSwitchTo(oldcxt);
2061 
2062  remote_final_lsn = lsn;
2063 
2064  /*
2065  * Make sure the handle apply_dispatch methods are aware we're in a remote
2066  * transaction.
2067  */
2068  in_remote_transaction = true;
2070 
2072 
2073  /*
2074  * Read the entries one by one and pass them through the same logic as in
2075  * apply_dispatch.
2076  */
2077  nchanges = 0;
2078  while (true)
2079  {
2081  size_t nbytes;
2082  int len;
2083 
2085 
2086  /* read length of the on-disk record */
2087  nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2088 
2089  /* have we reached end of the file? */
2090  if (nbytes == 0)
2091  break;
2092 
2093  /* do we have a correct length? */
2094  if (len <= 0)
2095  elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2096  len, path);
2097 
2098  /* make sure we have sufficiently large buffer */
2099  buffer = repalloc(buffer, len);
2100 
2101  /* and finally read the data into the buffer */
2102  BufFileReadExact(stream_fd, buffer, len);
2103 
2104  BufFileTell(stream_fd, &fileno, &offset);
2105 
2106  /* init a stringinfo using the buffer and call apply_dispatch */
2107  initReadOnlyStringInfo(&s2, buffer, len);
2108 
2109  /* Ensure we are reading the data into our memory context. */
2111 
2112  apply_dispatch(&s2);
2113 
2115 
2116  MemoryContextSwitchTo(oldcxt);
2117 
2118  nchanges++;
2119 
2120  /*
2121  * It is possible the file has been closed because we have processed
2122  * the transaction end message like stream_commit in which case that
2123  * must be the last message.
2124  */
2125  if (!stream_fd)
2126  {
2127  ensure_last_message(stream_fileset, xid, fileno, offset);
2128  break;
2129  }
2130 
2131  if (nchanges % 1000 == 0)
2132  elog(DEBUG1, "replayed %d changes from file \"%s\"",
2133  nchanges, path);
2134  }
2135 
2136  if (stream_fd)
2138 
2139  elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2140  nchanges, path);
2141 
2142  return;
2143 }
static void begin_replication_step(void)
Definition: worker.c:526
static void end_replication_step(void)
Definition: worker.c:549
MemoryContext ApplyMessageContext
Definition: worker.c:308
static void changes_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4209
static BufFile * stream_fd
Definition: worker.c:357
bool in_remote_transaction
Definition: worker.c:321
void apply_dispatch(StringInfo s)
Definition: worker.c:3290
static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, off_t offset)
Definition: worker.c:1987
static XLogRecPtr remote_final_lsn
Definition: worker.c:322
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn)
Definition: worker.c:4781
static void stream_close_file(void)
Definition: worker.c:4292
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
Definition: buffile.c:654
BufFile * BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, bool missing_ok)
Definition: buffile.c:291
void BufFileTell(BufFile *file, int *fileno, off_t *offset)
Definition: buffile.c:833
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
Definition: buffile.c:664
#define DEBUG1
Definition: elog.h:30
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:330
MemoryContext TopTransactionContext
Definition: mcxt.c:146
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1476
void * palloc(Size size)
Definition: mcxt.c:1226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
#define MAXPGPATH
const void size_t len
char * s2
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:166
ResourceOwner CurrentResourceOwner
Definition: resowner.c:164
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:129
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), apply_dispatch(), ApplyMessageContext, begin_replication_step(), BufFileOpenFileSet(), BufFileReadExact(), BufFileReadMaybeEOF(), BufFileTell(), changes_filename(), CHECK_FOR_INTERRUPTS, CurrentResourceOwner, DEBUG1, elog(), end_replication_step(), ensure_last_message(), ERROR, in_remote_transaction, initReadOnlyStringInfo(), len, MAXPGPATH, maybe_start_skipping_changes(), MemoryContextReset(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), pgstat_report_activity(), remote_final_lsn, repalloc(), s2, STATE_RUNNING, stream_close_file(), stream_fd, LogicalRepWorker::subid, TopTransactionContext, and TopTransactionResourceOwner.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_process_spooled_messages_if_required().

◆ DisableSubscriptionAndExit()

void DisableSubscriptionAndExit ( void  )

Definition at line 4723 of file worker.c.

4724 {
4725  /*
4726  * Emit the error message, and recover from the error state to an idle
4727  * state
4728  */
4729  HOLD_INTERRUPTS();
4730 
4731  EmitErrorReport();
4733  FlushErrorState();
4734 
4736 
4737  /* Report the worker failed during either table synchronization or apply */
4739  !am_tablesync_worker());
4740 
4741  /* Disable the subscription */
4745 
4746  /* Ensure we remove no-longer-useful entry for worker's start time */
4747  if (am_leader_apply_worker())
4749 
4750  /* Notify the subscription has been disabled and exit */
4751  ereport(LOG,
4752  errmsg("subscription \"%s\" has been disabled because of an error",
4753  MySubscription->name));
4754 
4755  proc_exit(0);
4756 }
Subscription * MySubscription
Definition: worker.c:316
void EmitErrorReport(void)
Definition: elog.c:1669
void FlushErrorState(void)
Definition: elog.c:1825
#define LOG
Definition: elog.h:31
void proc_exit(int code)
Definition: ipc.c:104
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1074
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
void DisableSubscription(Oid subid)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:2937
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4712

References AbortOutOfAnyTransaction(), am_leader_apply_worker(), am_tablesync_worker(), ApplyLauncherForgetWorkerStartTime(), CommitTransactionCommand(), DisableSubscription(), EmitErrorReport(), ereport, errmsg(), FlushErrorState(), HOLD_INTERRUPTS, LOG, MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pgstat_report_subscription_error(), proc_exit(), RESUME_INTERRUPTS, StartTransactionCommand(), and LogicalRepWorker::subid.

Referenced by start_apply(), and start_table_sync().

◆ InitializeLogRepWorker()

void InitializeLogRepWorker ( void  )

Definition at line 4579 of file worker.c.

4580 {
4581  MemoryContext oldctx;
4582 
4583  /* Run as replica session replication role. */
4584  SetConfigOption("session_replication_role", "replica",
4586 
4587  /* Connect to our database. */
4590  0);
4591 
4592  /*
4593  * Set always-secure search path, so malicious users can't redirect user
4594  * code (e.g. pg_index.indexprs).
4595  */
4596  SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4597 
4598  /* Load the subscription into persistent memory context. */
4600  "ApplyContext",
4604 
4606  if (!MySubscription)
4607  {
4608  ereport(LOG,
4609  (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4611 
4612  /* Ensure we remove no-longer-useful entry for worker's start time */
4613  if (am_leader_apply_worker())
4615 
4616  proc_exit(0);
4617  }
4618 
4619  MySubscriptionValid = true;
4620  MemoryContextSwitchTo(oldctx);
4621 
4622  if (!MySubscription->enabled)
4623  {
4624  ereport(LOG,
4625  (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4626  MySubscription->name)));
4627 
4629  }
4630 
4631  /* Setup synchronous commit according to the user's wishes */
4632  SetConfigOption("synchronous_commit", MySubscription->synccommit,
4634 
4635  /*
4636  * Keep us informed about subscription or role changes. Note that the
4637  * role's superuser privilege can be revoked.
4638  */
4641  (Datum) 0);
4642 
4645  (Datum) 0);
4646 
4647  if (am_tablesync_worker())
4648  ereport(LOG,
4649  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4652  else
4653  ereport(LOG,
4654  (errmsg("logical replication apply worker for subscription \"%s\" has started",
4655  MySubscription->name)));
4656 
4658 }
static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: worker.c:4009
static void apply_worker_exit(void)
Definition: worker.c:3849
MemoryContext ApplyContext
Definition: worker.c:309
static bool MySubscriptionValid
Definition: worker.c:317
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Definition: guc.c:4206
@ PGC_S_OVERRIDE
Definition: guc.h:119
@ PGC_SUSET
Definition: guc.h:74
@ PGC_BACKEND
Definition: guc.h:73
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1518
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1932
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:126
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:150
Subscription * GetSubscription(Oid subid, bool missing_ok)
uintptr_t Datum
Definition: postgres.h:64
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: postmaster.c:5596
@ AUTHOID
Definition: syscache.h:45
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, am_leader_apply_worker(), am_tablesync_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), AUTHOID, BackgroundWorkerInitializeConnectionByOid(), CacheRegisterSyscacheCallback(), CommitTransactionCommand(), LogicalRepWorker::dbid, Subscription::enabled, ereport, errmsg(), get_rel_name(), GetSubscription(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, PGC_BACKEND, PGC_S_OVERRIDE, PGC_SUSET, proc_exit(), LogicalRepWorker::relid, SetConfigOption(), StartTransactionCommand(), LogicalRepWorker::subid, subscription_change_cb(), SUBSCRIPTIONOID, Subscription::synccommit, TopMemoryContext, and LogicalRepWorker::userid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 274 of file tablesync.c.

275 {
276  table_states_valid = false;
277 }
static bool table_states_valid
Definition: tablesync.c:126

References table_states_valid.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)

Definition at line 639 of file launcher.c.

640 {
641  int slot_no;
642  uint16 generation;
643  LogicalRepWorker *worker;
644 
645  SpinLockAcquire(&winfo->shared->mutex);
646  generation = winfo->shared->logicalrep_worker_generation;
647  slot_no = winfo->shared->logicalrep_worker_slot_no;
648  SpinLockRelease(&winfo->shared->mutex);
649 
650  Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
651 
652  /*
653  * Detach from the error_mq_handle for the parallel apply worker before
654  * stopping it. This prevents the leader apply worker from trying to
655  * receive the message from the error queue that might already be detached
656  * by the parallel apply worker.
657  */
658  if (winfo->error_mq_handle)
659  {
661  winfo->error_mq_handle = NULL;
662  }
663 
664  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
665 
666  worker = &LogicalRepCtx->workers[slot_no];
667  Assert(isParallelApplyWorker(worker));
668 
669  /*
670  * Only stop the worker if the generation matches and the worker is alive.
671  */
672  if (worker->generation == generation && worker->proc)
673  logicalrep_worker_stop_internal(worker, SIGINT);
674 
675  LWLockRelease(LogicalRepWorkerLock);
676 }
unsigned short uint16
Definition: c.h:494
int max_logical_replication_workers
Definition: launcher.c:57
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:533
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:76
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
@ LW_SHARED
Definition: lwlock.h:117
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:844
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:73
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared

References Assert(), ParallelApplyWorkerInfo::error_mq_handle, LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SpinLockAcquire, SpinLockRelease, and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 854 of file launcher.c.

855 {
856  int i;
857  int res = 0;
858 
859  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
860 
861  /* Search for attached worker for a given subscription id. */
862  for (i = 0; i < max_logical_replication_workers; i++)
863  {
865 
866  if (isTablesyncWorker(w) && w->subid == subid)
867  res++;
868  }
869 
870  return res;
871 }
int i
Definition: isn.c:73
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1920

References Assert(), i, isTablesyncWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 713 of file launcher.c.

714 {
715  /* Block concurrent access. */
716  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
717 
718  Assert(slot >= 0 && slot < max_logical_replication_workers);
720 
722  {
723  LWLockRelease(LogicalRepWorkerLock);
724  ereport(ERROR,
725  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
726  errmsg("logical replication worker slot %d is empty, cannot attach",
727  slot)));
728  }
729 
731  {
732  LWLockRelease(LogicalRepWorkerLock);
733  ereport(ERROR,
734  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
735  errmsg("logical replication worker slot %d is already used by "
736  "another worker, cannot attach", slot)));
737  }
738 
741 
742  LWLockRelease(LogicalRepWorkerLock);
743 }
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:824
@ LW_EXCLUSIVE
Definition: lwlock.h:116
PGPROC * MyProc
Definition: proc.c:66

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_worker_find()

LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 249 of file launcher.c.

250 {
251  int i;
252  LogicalRepWorker *res = NULL;
253 
254  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
255 
256  /* Search for attached worker for a given subscription id. */
257  for (i = 0; i < max_logical_replication_workers; i++)
258  {
260 
261  /* Skip parallel apply workers. */
262  if (isParallelApplyWorker(w))
263  continue;
264 
265  if (w->in_use && w->subid == subid && w->relid == relid &&
266  (!only_running || w->proc))
267  {
268  res = w;
269  break;
270  }
271  }
272 
273  return res;
274 }

References Assert(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( LogicalRepWorkerType  wtype,
Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm 
)

Definition at line 306 of file launcher.c.

309 {
310  BackgroundWorker bgw;
311  BackgroundWorkerHandle *bgw_handle;
312  uint16 generation;
313  int i;
314  int slot = 0;
315  LogicalRepWorker *worker = NULL;
316  int nsyncworkers;
317  int nparallelapplyworkers;
319  bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
320  bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
321 
322  /*----------
323  * Sanity checks:
324  * - must be valid worker type
325  * - tablesync workers are only ones to have relid
326  * - parallel apply worker is the only kind of subworker
327  */
328  Assert(wtype != WORKERTYPE_UNKNOWN);
329  Assert(is_tablesync_worker == OidIsValid(relid));
330  Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
331 
332  ereport(DEBUG1,
333  (errmsg_internal("starting logical replication worker for subscription \"%s\"",
334  subname)));
335 
336  /* Report this after the initial starting message for consistency. */
337  if (max_replication_slots == 0)
338  ereport(ERROR,
339  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
340  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
341 
342  /*
343  * We need to do the modification of the shared memory under lock so that
344  * we have consistent view.
345  */
346  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
347 
348 retry:
349  /* Find unused worker slot. */
350  for (i = 0; i < max_logical_replication_workers; i++)
351  {
353 
354  if (!w->in_use)
355  {
356  worker = w;
357  slot = i;
358  break;
359  }
360  }
361 
362  nsyncworkers = logicalrep_sync_worker_count(subid);
363 
365 
366  /*
367  * If we didn't find a free slot, try to do garbage collection. The
368  * reason we do this is because if some worker failed to start up and its
369  * parent has crashed while waiting, the in_use state was never cleared.
370  */
371  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
372  {
373  bool did_cleanup = false;
374 
375  for (i = 0; i < max_logical_replication_workers; i++)
376  {
378 
379  /*
380  * If the worker was marked in use but didn't manage to attach in
381  * time, clean it up.
382  */
383  if (w->in_use && !w->proc &&
386  {
387  elog(WARNING,
388  "logical replication worker for subscription %u took too long to start; canceled",
389  w->subid);
390 
392  did_cleanup = true;
393  }
394  }
395 
396  if (did_cleanup)
397  goto retry;
398  }
399 
400  /*
401  * We don't allow to invoke more sync workers once we have reached the
402  * sync worker limit per subscription. So, just return silently as we
403  * might get here because of an otherwise harmless race condition.
404  */
405  if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
406  {
407  LWLockRelease(LogicalRepWorkerLock);
408  return false;
409  }
410 
411  nparallelapplyworkers = logicalrep_pa_worker_count(subid);
412 
413  /*
414  * Return false if the number of parallel apply workers reached the limit
415  * per subscription.
416  */
417  if (is_parallel_apply_worker &&
418  nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
419  {
420  LWLockRelease(LogicalRepWorkerLock);
421  return false;
422  }
423 
424  /*
425  * However if there are no more free worker slots, inform user about it
426  * before exiting.
427  */
428  if (worker == NULL)
429  {
430  LWLockRelease(LogicalRepWorkerLock);
432  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
433  errmsg("out of logical replication worker slots"),
434  errhint("You might need to increase %s.", "max_logical_replication_workers")));
435  return false;
436  }
437 
438  /* Prepare the worker slot. */
439  worker->type = wtype;
440  worker->launch_time = now;
441  worker->in_use = true;
442  worker->generation++;
443  worker->proc = NULL;
444  worker->dbid = dbid;
445  worker->userid = userid;
446  worker->subid = subid;
447  worker->relid = relid;
448  worker->relstate = SUBREL_STATE_UNKNOWN;
450  worker->stream_fileset = NULL;
451  worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
452  worker->parallel_apply = is_parallel_apply_worker;
453  worker->last_lsn = InvalidXLogRecPtr;
456  worker->reply_lsn = InvalidXLogRecPtr;
457  TIMESTAMP_NOBEGIN(worker->reply_time);
458 
459  /* Before releasing lock, remember generation for future identification. */
460  generation = worker->generation;
461 
462  LWLockRelease(LogicalRepWorkerLock);
463 
464  /* Register the new dynamic worker. */
465  memset(&bgw, 0, sizeof(bgw));
469  snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
470 
471  switch (worker->type)
472  {
473  case WORKERTYPE_APPLY:
474  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
476  "logical replication apply worker for subscription %u",
477  subid);
478  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
479  break;
480 
482  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
484  "logical replication parallel apply worker for subscription %u",
485  subid);
486  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
487 
488  memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
489  break;
490 
492  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
494  "logical replication tablesync worker for subscription %u sync %u",
495  subid,
496  relid);
497  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
498  break;
499 
500  case WORKERTYPE_UNKNOWN:
501  /* Should never happen. */
502  elog(ERROR, "unknown worker type");
503  }
504 
507  bgw.bgw_main_arg = Int32GetDatum(slot);
508 
509  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
510  {
511  /* Failed to start worker, so clean up the worker slot. */
512  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
513  Assert(generation == worker->generation);
515  LWLockRelease(LogicalRepWorkerLock);
516 
518  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
519  errmsg("out of background worker slots"),
520  errhint("You might need to increase %s.", "max_worker_processes")));
521  return false;
522  }
523 
524  /* Now wait until it attaches. */
525  return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
526 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1785
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1613
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:986
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
#define OidIsValid(objectId)
Definition: c.h:764
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
uint32 dsm_handle
Definition: dsm_impl.h:55
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errhint(const char *fmt,...)
Definition: elog.c:1316
#define WARNING
Definition: elog.h:36
int MyProcPid
Definition: globals.c:44
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:878
int max_sync_workers_per_subscription
Definition: launcher.c:58
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:189
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:854
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:59
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:792
#define InvalidPid
Definition: miscadmin.h:32
NameData subname
#define snprintf
Definition: port.h:238
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
int max_replication_slots
Definition: slot.c:102
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
int wal_receiver_timeout
Definition: walreceiver.c:91
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog(), ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_replication_slots, max_sync_workers_per_subscription, MAXPGPATH, MyProcPid, now(), OidIsValid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::type, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by ApplyLauncherMain(), pa_launch_parallel_worker(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 615 of file launcher.c.

616 {
617  LogicalRepWorker *worker;
618 
619  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
620 
621  worker = logicalrep_worker_find(subid, relid, false);
622 
623  if (worker)
624  {
625  Assert(!isParallelApplyWorker(worker));
626  logicalrep_worker_stop_internal(worker, SIGTERM);
627  }
628 
629  LWLockRelease(LogicalRepWorkerLock);
630 }
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:249

References Assert(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 682 of file launcher.c.

683 {
684  LogicalRepWorker *worker;
685 
686  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
687 
688  worker = logicalrep_worker_find(subid, relid, true);
689 
690  if (worker)
692 
693  LWLockRelease(LogicalRepWorkerLock);
694 }
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by apply_handle_stream_start(), and pg_attribute_noreturn().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 702 of file launcher.c.

703 {
704  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
705 
706  SetLatch(&worker->proc->procLatch);
707 }
void SetLatch(Latch *latch)
Definition: latch.c:633
Latch procLatch
Definition: proc.h:170

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by AtEOXact_LogicalRepWorkers(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List* logicalrep_workers_find ( Oid  subid,
bool  only_running 
)

Definition at line 281 of file launcher.c.

282 {
283  int i;
284  List *res = NIL;
285 
286  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
287 
288  /* Search for attached worker for a given subscription id. */
289  for (i = 0; i < max_logical_replication_workers; i++)
290  {
292 
293  if (w->in_use && w->subid == subid && (!only_running || w->proc))
294  res = lappend(res, w);
295  }
296 
297  return res;
298 }
List * lappend(List *list, void *datum)
Definition: list.c:338
Definition: pg_list.h:54

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ maybe_reread_subscription()

void maybe_reread_subscription ( void  )

Definition at line 3880 of file worker.c.

3881 {
3882  MemoryContext oldctx;
3884  bool started_tx = false;
3885 
3886  /* When cache state is valid there is nothing to do here. */
3887  if (MySubscriptionValid)
3888  return;
3889 
3890  /* This function might be called inside or outside of transaction. */
3891  if (!IsTransactionState())
3892  {
3894  started_tx = true;
3895  }
3896 
3897  /* Ensure allocations in permanent context. */
3899 
3901 
3902  /*
3903  * Exit if the subscription was removed. This normally should not happen
3904  * as the worker gets killed during DROP SUBSCRIPTION.
3905  */
3906  if (!newsub)
3907  {
3908  ereport(LOG,
3909  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3910  MySubscription->name)));
3911 
3912  /* Ensure we remove no-longer-useful entry for worker's start time */
3913  if (am_leader_apply_worker())
3915 
3916  proc_exit(0);
3917  }
3918 
3919  /* Exit if the subscription was disabled. */
3920  if (!newsub->enabled)
3921  {
3922  ereport(LOG,
3923  (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3924  MySubscription->name)));
3925 
3927  }
3928 
3929  /* !slotname should never happen when enabled is true. */
3930  Assert(newsub->slotname);
3931 
3932  /* two-phase should not be altered */
3933  Assert(newsub->twophasestate == MySubscription->twophasestate);
3934 
3935  /*
3936  * Exit if any parameter that affects the remote connection was changed.
3937  * The launcher will start a new worker but note that the parallel apply
3938  * worker won't restart if the streaming option's value is changed from
3939  * 'parallel' to any other value or the server decides not to stream the
3940  * in-progress transaction.
3941  */
3942  if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3943  strcmp(newsub->name, MySubscription->name) != 0 ||
3944  strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3945  newsub->binary != MySubscription->binary ||
3946  newsub->stream != MySubscription->stream ||
3947  newsub->passwordrequired != MySubscription->passwordrequired ||
3948  strcmp(newsub->origin, MySubscription->origin) != 0 ||
3949  newsub->owner != MySubscription->owner ||
3950  !equal(newsub->publications, MySubscription->publications))
3951  {
3953  ereport(LOG,
3954  (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3955  MySubscription->name)));
3956  else
3957  ereport(LOG,
3958  (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3959  MySubscription->name)));
3960 
3962  }
3963 
3964  /*
3965  * Exit if the subscription owner's superuser privileges have been
3966  * revoked.
3967  */
3968  if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3969  {
3971  ereport(LOG,
3972  errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3973  MySubscription->name));
3974  else
3975  ereport(LOG,
3976  errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3977  MySubscription->name));
3978 
3980  }
3981 
3982  /* Check for other changes that should never happen too. */
3983  if (newsub->dbid != MySubscription->dbid)
3984  {
3985  elog(ERROR, "subscription %u changed unexpectedly",
3987  }
3988 
3989  /* Clean old subscription info and switch to new one. */
3992 
3993  MemoryContextSwitchTo(oldctx);
3994 
3995  /* Change synchronous commit according to the user's wishes */
3996  SetConfigOption("synchronous_commit", MySubscription->synccommit,
3998 
3999  if (started_tx)
4001 
4002  MySubscriptionValid = true;
4003 }
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void FreeSubscription(Subscription *sub)
static color newsub(struct colormap *cm, color co)
Definition: regc_color.c:389
bool IsTransactionState(void)
Definition: xact.c:378

References am_leader_apply_worker(), am_parallel_apply_worker(), apply_worker_exit(), ApplyContext, ApplyLauncherForgetWorkerStartTime(), Assert(), Subscription::binary, CommitTransactionCommand(), Subscription::conninfo, Subscription::dbid, elog(), equal(), ereport, errmsg(), ERROR, FreeSubscription(), GetSubscription(), IsTransactionState(), LOG, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, MySubscriptionValid, Subscription::name, newsub(), Subscription::origin, Subscription::owner, Subscription::ownersuperuser, Subscription::passwordrequired, PGC_BACKEND, PGC_S_OVERRIDE, proc_exit(), Subscription::publications, SetConfigOption(), Subscription::slotname, StartTransactionCommand(), Subscription::stream, LogicalRepWorker::subid, Subscription::synccommit, and Subscription::twophasestate.

Referenced by apply_handle_commit_internal(), begin_replication_step(), LogicalRepApplyLoop(), and pa_can_start().

◆ pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid)

Definition at line 470 of file applyparallelworker.c.

471 {
472  bool found;
473  ParallelApplyWorkerInfo *winfo = NULL;
475 
476  if (!pa_can_start())
477  return;
478 
479  winfo = pa_launch_parallel_worker();
480  if (!winfo)
481  return;
482 
483  /* First time through, initialize parallel apply worker state hashtable. */
485  {
486  HASHCTL ctl;
487 
488  MemSet(&ctl, 0, sizeof(ctl));
489  ctl.keysize = sizeof(TransactionId);
490  ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
491  ctl.hcxt = ApplyContext;
492 
493  ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
494  16, &ctl,
496  }
497 
498  /* Create an entry for the requested transaction. */
499  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
500  if (found)
501  elog(ERROR, "hash table corrupted");
502 
503  /* Update the transaction information in shared memory. */
504  SpinLockAcquire(&winfo->shared->mutex);
506  winfo->shared->xid = xid;
507  SpinLockRelease(&winfo->shared->mutex);
508 
509  winfo->in_use = true;
510  winfo->serialize_changes = false;
511  entry->winfo = winfo;
512  entry->xid = xid;
513 }
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
static HTAB * ParallelApplyTxnHash
#define MemSet(start, val, len)
Definition: c.h:1009
uint32 TransactionId
Definition: c.h:641
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
ParallelApplyWorkerInfo * winfo
ParallelTransState xact_state

References ApplyContext, elog(), HASHCTL::entrysize, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, ParallelApplyWorkerInfo::in_use, HASHCTL::keysize, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, ParallelApplyWorkerEntry::xid, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

◆ pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1592 of file applyparallelworker.c.

1593 {
1595 
1596  /*
1597  * It is only possible to not have any pending stream chunks when we are
1598  * applying spooled messages.
1599  */
1601  {
1603  return;
1604 
1605  elog(ERROR, "invalid pending streaming chunk 0");
1606  }
1607 
1609  {
1612  }
1613 }
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_has_spooled_message_pending()
ParallelApplyWorkerShared * MyParallelShared
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:396
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:236
#define AccessShareLock
Definition: lockdefs.h:36
pg_atomic_uint32 pending_stream_count

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog(), ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

◆ pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 623 of file applyparallelworker.c.

624 {
625  ListCell *lc;
626 
627  foreach(lc, ParallelApplyWorkerPool)
628  {
630 
631  if (winfo->error_mq_handle)
632  {
634  winfo->error_mq_handle = NULL;
635  }
636  }
637 }
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition: pg_list.h:172

References ParallelApplyWorkerInfo::error_mq_handle, lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

◆ pa_find_worker()

ParallelApplyWorkerInfo* pa_find_worker ( TransactionId  xid)

Definition at line 519 of file applyparallelworker.c.

520 {
521  bool found;
523 
524  if (!TransactionIdIsValid(xid))
525  return NULL;
526 
528  return NULL;
529 
530  /* Return the cached parallel apply worker if valid. */
532  return stream_apply_worker;
533 
534  /* Find an entry for the requested transaction. */
535  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
536  if (found)
537  {
538  /* The worker must not have exited. */
539  Assert(entry->winfo->in_use);
540  return entry->winfo;
541  }
542 
543  return NULL;
544 }
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition: hsearch.h:113

References Assert(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

◆ pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1403 of file applyparallelworker.c.

1404 {
1405  /*
1406  * We don't need to free this explicitly as the allocated memory will be
1407  * freed at the transaction end.
1408  */
1409  subxactlist = NIL;
1410 }
static List * subxactlist

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

◆ pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfo winfo,
Size  nbytes,
const void *  data 
)

Definition at line 1147 of file applyparallelworker.c.

1148 {
1149  int rc;
1150  shm_mq_result result;
1151  TimestampTz startTime = 0;
1152 
1154  Assert(!winfo->serialize_changes);
1155 
1156  /*
1157  * We don't try to send data to parallel worker for 'immediate' mode. This
1158  * is primarily used for testing purposes.
1159  */
1161  return false;
1162 
1163 /*
1164  * This timeout is a bit arbitrary but testing revealed that it is sufficient
1165  * to send the message unless the parallel apply worker is waiting on some
1166  * lock or there is a serious resource crunch. See the comments atop this file
1167  * to know why we are using a non-blocking way to send the message.
1168  */
1169 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1170 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1171 
1172  for (;;)
1173  {
1174  result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1175 
1176  if (result == SHM_MQ_SUCCESS)
1177  return true;
1178  else if (result == SHM_MQ_DETACHED)
1179  ereport(ERROR,
1180  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1181  errmsg("could not send data to shared-memory queue")));
1182 
1183  Assert(result == SHM_MQ_WOULD_BLOCK);
1184 
1185  /* Wait before retrying. */
1186  rc = WaitLatch(MyLatch,
1189  WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1190 
1191  if (rc & WL_LATCH_SET)
1192  {
1195  }
1196 
1197  if (startTime == 0)
1198  startTime = GetCurrentTimestamp();
1199  else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1201  return false;
1202  }
1203 }
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
#define unlikely(x)
Definition: c.h:300
struct Latch * MyLatch
Definition: globals.c:58
void ResetLatch(Latch *latch)
Definition: latch.c:725
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:518
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
const void * data
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:28
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:330
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
shm_mq_handle * mq_handle

References Assert(), CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_set_fileset_state()

◆ pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfo winfo)

Definition at line 1335 of file applyparallelworker.c.

1336 {
1337  stream_apply_worker = winfo;
1338 }

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

◆ pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerShared wshared,
ParallelTransState  xact_state 
)

◆ pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1363 of file applyparallelworker.c.

1364 {
1365  if (current_xid != top_xid &&
1366  !list_member_xid(subxactlist, current_xid))
1367  {
1368  MemoryContext oldctx;
1369  char spname[NAMEDATALEN];
1370 
1371  pa_savepoint_name(MySubscription->oid, current_xid,
1372  spname, sizeof(spname));
1373 
1374  elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1375 
1376  /* We must be in transaction block to define the SAVEPOINT. */
1377  if (!IsTransactionBlock())
1378  {
1379  if (!IsTransactionState())
1381 
1384  }
1385 
1386  DefineSavepoint(spname);
1387 
1388  /*
1389  * CommitTransactionCommand is needed to start a subtransaction after
1390  * issuing a SAVEPOINT inside a transaction block (see
1391  * StartSubTransaction()).
1392  */
1394 
1396  subxactlist = lappend_xid(subxactlist, current_xid);
1397  MemoryContextSwitchTo(oldctx);
1398  }
1399 }
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:392
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:741
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition: xact.c:4219
bool IsTransactionBlock(void)
Definition: xact.c:4816
void BeginTransactionBlock(void)
Definition: xact.c:3770

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog(), IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

◆ pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortData abort_data)

Definition at line 1417 of file applyparallelworker.c.

1418 {
1419  TransactionId xid = abort_data->xid;
1420  TransactionId subxid = abort_data->subxid;
1421 
1422  /*
1423  * Update origin state so we can restart streaming from correct position
1424  * in case of crash.
1425  */
1428 
1429  /*
1430  * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1431  * just free the subxactlist.
1432  */
1433  if (subxid == xid)
1434  {
1436 
1437  /*
1438  * Release the lock as we might be processing an empty streaming
1439  * transaction in which case the lock won't be released during
1440  * transaction rollback.
1441  *
1442  * Note that it's ok to release the transaction lock before aborting
1443  * the transaction because even if the parallel apply worker dies due
1444  * to crash or some other reason, such a transaction would still be
1445  * considered aborted.
1446  */
1448 
1450 
1451  if (IsTransactionBlock())
1452  {
1453  EndTransactionBlock(false);
1455  }
1456 
1458 
1460  }
1461  else
1462  {
1463  /* OK, so it's a subxact. Rollback to the savepoint. */
1464  int i;
1465  char spname[NAMEDATALEN];
1466 
1467  pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1468 
1469  elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1470 
1471  /*
1472  * Search the subxactlist, determine the offset tracked for the
1473  * subxact, and truncate the list.
1474  *
1475  * Note that for an empty sub-transaction we won't find the subxid
1476  * here.
1477  */
1478  for (i = list_length(subxactlist) - 1; i >= 0; i--)
1479  {
1481 
1482  if (xid_tmp == subxid)
1483  {
1484  RollbackToSavepoint(spname);
1487  break;
1488  }
1489  }
1490  }
1491 }
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
@ STATE_IDLE
List * list_truncate(List *list, int new_size)
Definition: list.c:630
#define AccessExclusiveLock
Definition: lockdefs.h:43
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
static int list_length(const List *l)
Definition: pg_list.h:152
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
void RollbackToSavepoint(const char *name)
Definition: xact.c:4413
bool EndTransactionBlock(bool chain)
Definition: xact.c:3890
void AbortCurrentTransaction(void)
Definition: xact.c:3305

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog(), EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfo winfo,
bool  stream_locked 
)

Definition at line 1212 of file applyparallelworker.c.

1214 {
1215  ereport(LOG,
1216  (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1217  winfo->shared->xid)));
1218 
1219  /*
1220  * The parallel apply worker could be stuck for some reason (say waiting
1221  * on some lock by other backend), so stop trying to send data directly to
1222  * it and start serializing data to the file instead.
1223  */
1224  winfo->serialize_changes = true;
1225 
1226  /* Initialize the stream fileset. */
1227  stream_start_internal(winfo->shared->xid, true);
1228 
1229  /*
1230  * Acquires the stream lock if not already to make sure that the parallel
1231  * apply worker will wait for the leader to release the stream lock until
1232  * the end of the transaction.
1233  */
1234  if (!stream_locked)
1236 
1238 }
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1447

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

◆ pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

◆ pa_unlock_transaction()

◆ pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfo winfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1619 of file applyparallelworker.c.

1620 {
1622 
1623  /*
1624  * Unlock the shared object lock so that parallel apply worker can
1625  * continue to receive and apply changes.
1626  */
1628 
1629  /*
1630  * Wait for that worker to finish. This is necessary to maintain commit
1631  * order which avoids failures due to transaction dependencies and
1632  * deadlocks.
1633  */
1634  pa_wait_for_xact_finish(winfo);
1635 
1636  if (!XLogRecPtrIsInvalid(remote_lsn))
1637  store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1638 
1639  pa_free_worker(winfo);
1640 }
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3454

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 650 of file tablesync.c.

651 {
652  switch (MyLogicalRepWorker->type)
653  {
655 
656  /*
657  * Skip for parallel apply workers because they only operate on
658  * tables that are in a READY state. See pa_can_start() and
659  * should_apply_changes_for_rel().
660  */
661  break;
662 
664  process_syncing_tables_for_sync(current_lsn);
665  break;
666 
667  case WORKERTYPE_APPLY:
669  break;
670 
671  case WORKERTYPE_UNKNOWN:
672  /* Should never happen. */
673  elog(ERROR, "Unknown worker type");
674  }
675 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:411
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:288

References elog(), ERROR, MyLogicalRepWorker, process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ ReplicationOriginNameForLogicalRep()

void ReplicationOriginNameForLogicalRep ( Oid  suboid,
Oid  relid,
char *  originname,
Size  szoriginname 
)

Definition at line 446 of file worker.c.

448 {
449  if (OidIsValid(relid))
450  {
451  /* Replication origin name for tablesync workers. */
452  snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
453  }
454  else
455  {
456  /* Replication origin name for non-tablesync workers. */
457  snprintf(originname, szoriginname, "pg_%u", suboid);
458  }
459 }

References OidIsValid, and snprintf.

Referenced by AlterSubscription(), AlterSubscription_refresh(), CreateSubscription(), DropSubscription(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), run_apply_worker(), and run_tablesync_worker().

◆ set_apply_error_context_origin()

void set_apply_error_context_origin ( char *  originname)

Definition at line 5053 of file worker.c.

5054 {
5056  originname);
5057 }
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1631

References apply_error_callback_arg, ApplyContext, MemoryContextStrdup(), and ApplyErrorCallbackArg::origin_name.

Referenced by ParallelApplyWorkerMain(), run_apply_worker(), and run_tablesync_worker().

◆ set_stream_options()

void set_stream_options ( WalRcvStreamOptions options,
char *  slotname,
XLogRecPtr origin_startpos 
)

Definition at line 4356 of file worker.c.

4359 {
4360  int server_version;
4361 
4362  options->logical = true;
4363  options->startpoint = *origin_startpos;
4364  options->slotname = slotname;
4365 
4367  options->proto.logical.proto_version =
4372 
4373  options->proto.logical.publication_names = MySubscription->publications;
4374  options->proto.logical.binary = MySubscription->binary;
4375 
4376  /*
4377  * Assign the appropriate option value for streaming option according to
4378  * the 'streaming' mode and the publisher's ability to support that mode.
4379  */
4380  if (server_version >= 160000 &&
4382  {
4383  options->proto.logical.streaming_str = "parallel";
4385  }
4386  else if (server_version >= 140000 &&
4388  {
4389  options->proto.logical.streaming_str = "on";
4391  }
4392  else
4393  {
4394  options->proto.logical.streaming_str = NULL;
4396  }
4397 
4398  options->proto.logical.twophase = false;
4399  options->proto.logical.origin = pstrdup(MySubscription->origin);
4400 }
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:314
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1644
static int server_version
Definition: pg_dumpall.c:110
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define walrcv_server_version(conn)
Definition: walreceiver.h:420

References Subscription::binary, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_PARALLEL, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::origin, LogicalRepWorker::parallel_apply, pstrdup(), Subscription::publications, server_version, Subscription::stream, and walrcv_server_version.

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ SetupApplyOrSyncWorker()

void SetupApplyOrSyncWorker ( int  worker_slot)

Definition at line 4662 of file worker.c.

4663 {
4664  /* Attach to slot */
4665  logicalrep_worker_attach(worker_slot);
4666 
4668 
4669  /* Setup signal handling */
4671  pqsignal(SIGTERM, die);
4673 
4674  /*
4675  * We don't currently need any ResourceOwner in a walreceiver process, but
4676  * if we did, we could call CreateAuxProcessResourceOwner here.
4677  */
4678 
4679  /* Initialise stats to a sanish value */
4682 
4683  /* Load the libpq-specific functions */
4684  load_file("libpqwalreceiver", false);
4685 
4687 
4688  /* Connect to the origin and start the replication. */
4689  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4691 
4692  /*
4693  * Setup callback for syscache so that we know when something changes in
4694  * the subscription relation state.
4695  */
4698  (Datum) 0);
4699 }
void InitializeLogRepWorker(void)
Definition: worker.c:4579
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void logicalrep_worker_attach(int slot)
Definition: launcher.c:713
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5636
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:274
#define SIGHUP
Definition: win32_port.h:168

References am_leader_apply_worker(), am_tablesync_worker(), Assert(), BackgroundWorkerUnblockSignals(), CacheRegisterSyscacheCallback(), Subscription::conninfo, DEBUG1, die, elog(), GetCurrentTimestamp(), InitializeLogRepWorker(), invalidate_syncing_table_states(), LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), logicalrep_worker_attach(), MyLogicalRepWorker, MySubscription, pqsignal(), LogicalRepWorker::reply_time, SIGHUP, SignalHandlerForConfigReload(), and SUBSCRIPTIONRELMAP.

Referenced by ApplyWorkerMain(), and TablesyncWorkerMain().

◆ start_apply()

void start_apply ( XLogRecPtr  origin_startpos)

Definition at line 4443 of file worker.c.

4444 {
4445  PG_TRY();
4446  {
4447  LogicalRepApplyLoop(origin_startpos);
4448  }
4449  PG_CATCH();
4450  {
4453  else
4454  {
4455  /*
4456  * Report the worker failed while applying changes. Abort the
4457  * current transaction so that the stats message is sent in an
4458  * idle state.
4459  */
4462 
4463  PG_RE_THROW();
4464  }
4465  }
4466  PG_END_TRY();
4467 }
static void LogicalRepApplyLoop(XLogRecPtr last_received)
Definition: worker.c:3496
void DisableSubscriptionAndExit(void)
Definition: worker.c:4723
#define PG_RE_THROW()
Definition: elog.h:411
#define PG_TRY(...)
Definition: elog.h:370
#define PG_END_TRY(...)
Definition: elog.h:395
#define PG_CATCH(...)
Definition: elog.h:380

References AbortOutOfAnyTransaction(), am_tablesync_worker(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepApplyLoop(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by run_apply_worker(), and run_tablesync_worker().

◆ store_flush_position()

void store_flush_position ( XLogRecPtr  remote_lsn,
XLogRecPtr  local_lsn 
)

Definition at line 3454 of file worker.c.

3455 {
3456  FlushPosition *flushpos;
3457 
3458  /*
3459  * Skip for parallel apply workers, because the lsn_mapping is maintained
3460  * by the leader apply worker.
3461  */
3463  return;
3464 
3465  /* Need to do this in permanent context */
3467 
3468  /* Track commit lsn */
3469  flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3470  flushpos->local_end = local_lsn;
3471  flushpos->remote_end = remote_lsn;
3472 
3473  dlist_push_tail(&lsn_mapping, &flushpos->node);
3475 }
static dlist_head lsn_mapping
Definition: worker.c:222
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
dlist_node node
Definition: worker.c:217
XLogRecPtr remote_end
Definition: worker.c:219
XLogRecPtr local_end
Definition: worker.c:218

References am_parallel_apply_worker(), ApplyContext, ApplyMessageContext, dlist_push_tail(), FlushPosition::local_end, lsn_mapping, MemoryContextSwitchTo(), FlushPosition::node, palloc(), and FlushPosition::remote_end.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_prepare(), and pa_xact_finish().

◆ stream_cleanup_files()

void stream_cleanup_files ( Oid  subid,
TransactionId  xid 
)

Definition at line 4223 of file worker.c.

4224 {
4225  char path[MAXPGPATH];
4226 
4227  /* Delete the changes file. */
4228  changes_filename(path, subid, xid);
4230 
4231  /* Delete the subxact file, if it exists. */
4232  subxact_filename(path, subid, xid);
4234 }
static void subxact_filename(char *path, Oid subid, TransactionId xid)
Definition: worker.c:4202
void BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
Definition: buffile.c:364

References BufFileDeleteFileSet(), changes_filename(), MAXPGPATH, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and subxact_filename().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_free_worker_info(), and stream_abort_internal().

◆ stream_start_internal()

void stream_start_internal ( TransactionId  xid,
bool  first_segment 
)

Definition at line 1447 of file worker.c.

1448 {
1450 
1451  /*
1452  * Initialize the worker's stream_fileset if we haven't yet. This will be
1453  * used for the entire duration of the worker so create it in a permanent
1454  * context. We create this on the very first streaming message from any
1455  * transaction and then use it for this and other streaming transactions.
1456  * Now, we could create a fileset at the start of the worker as well but
1457  * then we won't be sure that it will ever be used.
1458  */
1460  {
1461  MemoryContext oldctx;
1462 
1464 
1467 
1468  MemoryContextSwitchTo(oldctx);
1469  }
1470 
1471  /* Open the spool file for this transaction. */
1472  stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1473 
1474  /* If this is not the first segment, open existing subxact file. */
1475  if (!first_segment)
1477 
1479 }
static void stream_open_file(Oid subid, TransactionId xid, bool first_segment)
Definition: worker.c:4247
static void subxact_info_read(Oid subid, TransactionId xid)
Definition: worker.c:4073
void FileSetInit(FileSet *fileset)
Definition: fileset.c:55

References ApplyContext, begin_replication_step(), end_replication_step(), FileSetInit(), MemoryContextSwitchTo(), MyLogicalRepWorker, palloc(), LogicalRepWorker::stream_fileset, stream_open_file(), LogicalRepWorker::subid, and subxact_info_read().

Referenced by apply_handle_stream_start(), pa_switch_to_partial_serialize(), and stream_open_and_write_change().

◆ stream_stop_internal()

void stream_stop_internal ( TransactionId  xid)

Definition at line 1621 of file worker.c.

1622 {
1623  /*
1624  * Serialize information about subxacts for the toplevel transaction, then
1625  * close the stream messages spool file.
1626  */
1629 
1630  /* We must be in a valid transaction state */
1632 
1633  /* Commit the per-stream transaction */
1635 
1636  /* Reset per-stream context */
1638 }
static void subxact_info_write(Oid subid, TransactionId xid)
Definition: worker.c:4024
static MemoryContext LogicalStreamingContext
Definition: worker.c:312

References Assert(), CommitTransactionCommand(), IsTransactionState(), LogicalStreamingContext, MemoryContextReset(), MyLogicalRepWorker, stream_close_file(), LogicalRepWorker::subid, and subxact_info_write().

Referenced by apply_handle_stream_stop(), and stream_open_and_write_change().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1728 of file tablesync.c.

1729 {
1730  Relation rel;
1731  HeapTuple tup;
1732  bool nulls[Natts_pg_subscription];
1733  bool replaces[Natts_pg_subscription];
1734  Datum values[Natts_pg_subscription];
1735 
1737  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1738  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1739 
1740  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1742  if (!HeapTupleIsValid(tup))
1743  elog(ERROR,
1744  "cache lookup failed for subscription oid %u",
1745  suboid);
1746 
1747  /* Form a new tuple. */
1748  memset(values, 0, sizeof(values));
1749  memset(nulls, false, sizeof(nulls));
1750  memset(replaces, false, sizeof(replaces));
1751 
1752  /* And update/set two_phase state */
1753  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1754  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1755 
1756  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1757  values, nulls, replaces);
1758  CatalogTupleUpdate(rel, &tup->t_self, tup);
1759 
1760  heap_freetuple(tup);
1762 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
#define RowExclusiveLock
Definition: lockdefs.h:38
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:182
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

Variable Documentation

◆ apply_error_context_stack

PGDLLIMPORT ErrorContextCallback* apply_error_context_stack
extern

Definition at line 306 of file worker.c.

Referenced by HandleParallelApplyMessage(), and LogicalRepApplyLoop().

◆ ApplyContext

◆ ApplyMessageContext

◆ in_remote_transaction

◆ InitializingApplyWorker

PGDLLIMPORT bool InitializingApplyWorker
extern

Definition at line 336 of file worker.c.

Referenced by ApplyWorkerMain(), logicalrep_worker_onexit(), and ParallelApplyWorkerMain().

◆ LogRepWorkerWalRcvConn

◆ MyLogicalRepWorker

◆ MyParallelShared

◆ MySubscription