PostgreSQL Source Code  git master
logicalworker.h File Reference
#include <signal.h>
Include dependency graph for logicalworker.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
void ParallelApplyWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void HandleParallelApplyMessageInterrupt (void)
 
void HandleParallelApplyMessages (void)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 

Variables

PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4473 of file worker.c.

4474 {
4475  int worker_slot = DatumGetInt32(main_arg);
4476  char originname[NAMEDATALEN];
4477  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4478  char *myslotname = NULL;
4480  int server_version;
4481 
4482  /* Attach to slot */
4483  logicalrep_worker_attach(worker_slot);
4484 
4485  /* Setup signal handling */
4487  pqsignal(SIGTERM, die);
4489 
4490  /*
4491  * We don't currently need any ResourceOwner in a walreceiver process, but
4492  * if we did, we could call CreateAuxProcessResourceOwner here.
4493  */
4494 
4495  /* Initialise stats to a sanish value */
4498 
4499  /* Load the libpq-specific functions */
4500  load_file("libpqwalreceiver", false);
4501 
4503 
4504  /* Connect to the origin and start the replication. */
4505  elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4507 
4508  if (am_tablesync_worker())
4509  {
4510  start_table_sync(&origin_startpos, &myslotname);
4511 
4514  originname,
4515  sizeof(originname));
4516  set_apply_error_context_origin(originname);
4517  }
4518  else
4519  {
4520  /* This is the leader apply worker */
4521  RepOriginId originid;
4522  TimeLineID startpointTLI;
4523  char *err;
4524 
4525  myslotname = MySubscription->slotname;
4526 
4527  /*
4528  * This shouldn't happen if the subscription is enabled, but guard
4529  * against DDL bugs or manual catalog changes. (libpqwalreceiver will
4530  * crash if slot is NULL.)
4531  */
4532  if (!myslotname)
4533  ereport(ERROR,
4534  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4535  errmsg("subscription has no replication slot set")));
4536 
4537  /* Setup replication origin tracking. */
4540  originname, sizeof(originname));
4541  originid = replorigin_by_name(originname, true);
4542  if (!OidIsValid(originid))
4543  originid = replorigin_create(originname);
4544  replorigin_session_setup(originid, 0);
4545  replorigin_session_origin = originid;
4546  origin_startpos = replorigin_session_get_progress(false);
4548 
4550  MySubscription->name, &err);
4551  if (LogRepWorkerWalRcvConn == NULL)
4552  ereport(ERROR,
4553  (errcode(ERRCODE_CONNECTION_FAILURE),
4554  errmsg("could not connect to the publisher: %s", err)));
4555 
4556  /*
4557  * We don't really use the output identify_system for anything but it
4558  * does some initializations on the upstream so let's still call it.
4559  */
4560  (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4561 
4562  set_apply_error_context_origin(originname);
4563  }
4564 
4565  /*
4566  * Setup callback for syscache so that we know when something changes in
4567  * the subscription relation state.
4568  */
4571  (Datum) 0);
4572 
4573  /* Build logical replication streaming options. */
4574  options.logical = true;
4575  options.startpoint = origin_startpos;
4576  options.slotname = myslotname;
4577 
4579  options.proto.logical.proto_version =
4584 
4585  options.proto.logical.publication_names = MySubscription->publications;
4586  options.proto.logical.binary = MySubscription->binary;
4587 
4588  /*
4589  * Assign the appropriate option value for streaming option according to
4590  * the 'streaming' mode and the publisher's ability to support that mode.
4591  */
4592  if (server_version >= 160000 &&
4594  {
4595  options.proto.logical.streaming_str = "parallel";
4597  }
4598  else if (server_version >= 140000 &&
4600  {
4601  options.proto.logical.streaming_str = "on";
4603  }
4604  else
4605  {
4606  options.proto.logical.streaming_str = NULL;
4608  }
4609 
4610  options.proto.logical.twophase = false;
4611  options.proto.logical.origin = pstrdup(MySubscription->origin);
4612 
4613  if (!am_tablesync_worker())
4614  {
4615  /*
4616  * Even when the two_phase mode is requested by the user, it remains
4617  * as the tri-state PENDING until all tablesyncs have reached READY
4618  * state. Only then, can it become ENABLED.
4619  *
4620  * Note: If the subscription has no tables then leave the state as
4621  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4622  * work.
4623  */
4626  {
4627  /* Start streaming with two_phase enabled */
4628  options.proto.logical.twophase = true;
4630 
4635  }
4636  else
4637  {
4639  }
4640 
4641  ereport(DEBUG1,
4642  (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4647  "?")));
4648  }
4649  else
4650  {
4651  /* Start normal logical streaming replication. */
4653  }
4654 
4655  /* Run the main loop. */
4656  start_apply(origin_startpos);
4657 
4658  proc_exit(0);
4659 }
void InitializeApplyWorker(void)
Definition: worker.c:4395
static void start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
Definition: worker.c:4319
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:457
static void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4362
void set_apply_error_context_origin(char *originname)
Definition: worker.c:4996
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:312
Subscription * MySubscription
Definition: worker.c:314
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
#define OidIsValid(objectId)
Definition: c.h:759
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_attach(int slot)
Definition: launcher.c:674
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:41
char * pstrdup(const char *in)
Definition: mcxt.c:1624
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1234
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:110
static char ** options
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
uintptr_t Datum
Definition: postgres.h:64
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
#define InvalidOid
Definition: postgres_ext.h:36
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5660
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
bool AllTablesyncsReady(void)
Definition: tablesync.c:1573
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:272
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1598
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:422
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
#define walrcv_server_version(conn)
Definition: walreceiver.h:418
#define walrcv_identify_system(conn, primary_tli)
Definition: walreceiver.h:416
#define SIGHUP
Definition: win32_port.h:176
static bool am_tablesync_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:2944
void CommitTransactionCommand(void)
Definition: xact.c:3041
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59

References AllTablesyncsReady(), am_tablesync_worker(), BackgroundWorkerUnblockSignals(), Subscription::binary, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), Subscription::conninfo, DatumGetInt32(), DEBUG1, die, elog(), ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, GetCurrentTimestamp(), InitializeApplyWorker(), invalidate_syncing_table_states(), InvalidOid, InvalidXLogRecPtr, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, load_file(), LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_PARALLEL, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_attach(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, Subscription::oid, OidIsValid, options, Subscription::origin, LogicalRepWorker::parallel_apply, pqsignal(), proc_exit(), pstrdup(), Subscription::publications, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, server_version, set_apply_error_context_origin(), SIGHUP, SignalHandlerForConfigReload(), Subscription::slotname, start_apply(), start_table_sync(), StartTransactionCommand(), Subscription::stream, SUBSCRIPTIONRELMAP, Subscription::twophasestate, UpdateTwoPhaseState(), walrcv_connect, walrcv_identify_system, walrcv_server_version, and walrcv_startstreaming.

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 4964 of file worker.c.

4965 {
4966  if (isCommit && on_commit_wakeup_workers_subids != NIL)
4967  {
4968  ListCell *lc;
4969 
4970  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
4971  foreach(lc, on_commit_wakeup_workers_subids)
4972  {
4973  Oid subid = lfirst_oid(lc);
4974  List *workers;
4975  ListCell *lc2;
4976 
4977  workers = logicalrep_workers_find(subid, true);
4978  foreach(lc2, workers)
4979  {
4980  LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
4981 
4983  }
4984  }
4985  LWLockRelease(LogicalRepWorkerLock);
4986  }
4987 
4988  /* The List storage will be reclaimed automatically in xact cleanup. */
4990 }
static List * on_commit_wakeup_workers_subids
Definition: worker.c:317
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:663
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:281
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:116
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:31
Definition: pg_list.h:54

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )

Definition at line 986 of file applyparallelworker.c.

987 {
988  InterruptPending = true;
990  SetLatch(MyLatch);
991 }
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition: globals.c:30
struct Latch * MyLatch
Definition: globals.c:58
void SetLatch(Latch *latch)
Definition: latch.c:607

References InterruptPending, MyLatch, ParallelApplyMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ HandleParallelApplyMessages()

void HandleParallelApplyMessages ( void  )

Definition at line 1060 of file applyparallelworker.c.

1061 {
1062  ListCell *lc;
1063  MemoryContext oldcontext;
1064 
1065  static MemoryContext hpam_context = NULL;
1066 
1067  /*
1068  * This is invoked from ProcessInterrupts(), and since some of the
1069  * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1070  * for recursive calls if more signals are received while this runs. It's
1071  * unclear that recursive entry would be safe, and it doesn't seem useful
1072  * even if it is safe, so let's block interrupts until done.
1073  */
1074  HOLD_INTERRUPTS();
1075 
1076  /*
1077  * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1078  * don't want to risk leaking data into long-lived contexts, so let's do
1079  * our work here in a private context that we can reset on each use.
1080  */
1081  if (!hpam_context) /* first time through? */
1082  hpam_context = AllocSetContextCreate(TopMemoryContext,
1083  "HandleParallelApplyMessages",
1085  else
1086  MemoryContextReset(hpam_context);
1087 
1088  oldcontext = MemoryContextSwitchTo(hpam_context);
1089 
1091 
1092  foreach(lc, ParallelApplyWorkerPool)
1093  {
1095  Size nbytes;
1096  void *data;
1098 
1099  /*
1100  * The leader will detach from the error queue and set it to NULL
1101  * before preparing to stop all parallel apply workers, so we don't
1102  * need to handle error messages anymore. See
1103  * logicalrep_worker_detach.
1104  */
1105  if (!winfo->error_mq_handle)
1106  continue;
1107 
1108  res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1109 
1110  if (res == SHM_MQ_WOULD_BLOCK)
1111  continue;
1112  else if (res == SHM_MQ_SUCCESS)
1113  {
1114  StringInfoData msg;
1115 
1116  initStringInfo(&msg);
1117  appendBinaryStringInfo(&msg, data, nbytes);
1119  pfree(msg.data);
1120  }
1121  else
1122  ereport(ERROR,
1123  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1124  errmsg("lost connection to the logical replication parallel apply worker")));
1125  }
1126 
1127  MemoryContextSwitchTo(oldcontext);
1128 
1129  /* Might as well clear the context on our way out */
1130  MemoryContextReset(hpam_context);
1131 
1133 }
static List * ParallelApplyWorkerPool
static void HandleParallelApplyMessage(StringInfo msg)
size_t Size
Definition: c.h:589
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:314
void pfree(void *pointer)
Definition: mcxt.c:1436
MemoryContext TopMemoryContext
Definition: mcxt.c:141
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
const void * data
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:573
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:227
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
shm_mq_handle * error_mq_handle

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HandleParallelApplyMessage(), HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), res, RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4714 of file worker.c.

4715 {
4717 }
bool IsLogicalWorker(void)
Definition: worker.c:4705
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4705 of file worker.c.

4706 {
4707  return MyLogicalRepWorker != NULL;
4708 }

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

Definition at line 4950 of file worker.c.

4951 {
4952  MemoryContext oldcxt;
4953 
4957  MemoryContextSwitchTo(oldcxt);
4958 }
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1379
MemoryContext TopTransactionContext
Definition: mcxt.c:146

References list_append_unique_oid(), MemoryContextSwitchTo(), on_commit_wakeup_workers_subids, and TopTransactionContext.

Referenced by AlterObjectRename_internal(), AlterSubscription(), and AlterSubscriptionOwner_internal().

◆ ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg)

Definition at line 863 of file applyparallelworker.c.

864 {
866  dsm_handle handle;
867  dsm_segment *seg;
868  shm_toc *toc;
869  shm_mq *mq;
870  shm_mq_handle *mqh;
871  shm_mq_handle *error_mqh;
872  RepOriginId originid;
873  int worker_slot = DatumGetInt32(main_arg);
874  char originname[NAMEDATALEN];
875 
876  /* Setup signal handling. */
879  pqsignal(SIGTERM, die);
881 
882  /*
883  * Attach to the dynamic shared memory segment for the parallel apply, and
884  * find its table of contents.
885  *
886  * Like parallel query, we don't need resource owner by this time. See
887  * ParallelWorkerMain.
888  */
889  memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
890  seg = dsm_attach(handle);
891  if (!seg)
892  ereport(ERROR,
893  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
894  errmsg("unable to map dynamic shared memory segment")));
895 
897  if (!toc)
898  ereport(ERROR,
899  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
900  errmsg("bad magic number in dynamic shared memory segment")));
901 
903 
904  /* Look up the shared information. */
905  shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
906  MyParallelShared = shared;
907 
908  /*
909  * Attach to the message queue.
910  */
911  mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
913  mqh = shm_mq_attach(mq, seg, NULL);
914 
915  /*
916  * Primary initialization is complete. Now, we can attach to our slot.
917  * This is to ensure that the leader apply worker does not write data to
918  * the uninitialized memory queue.
919  */
920  logicalrep_worker_attach(worker_slot);
921 
926 
927  /*
928  * Attach to the error queue.
929  */
932  error_mqh = shm_mq_attach(mq, seg, NULL);
933 
934  pq_redirect_to_shm_mq(seg, error_mqh);
937 
940 
942 
943  /* Setup replication origin tracking. */
946  originname, sizeof(originname));
947  originid = replorigin_by_name(originname, false);
948 
949  /*
950  * The parallel apply worker doesn't need to monopolize this replication
951  * origin which was already acquired by its leader process.
952  */
954  replorigin_session_origin = originid;
956 
957  /*
958  * Setup callback for syscache so that we know when something changes in
959  * the subscription relation state.
960  */
963  (Datum) 0);
964 
965  set_apply_error_context_origin(originname);
966 
968 
969  /*
970  * The parallel apply worker must not get here because the parallel apply
971  * worker will only stop when it receives a SIGTERM or SIGINT from the
972  * leader, or when there is an error. None of these cases will allow the
973  * code to reach here.
974  */
975  Assert(false);
976 }
static void pa_shutdown(int code, Datum arg)
#define PARALLEL_APPLY_KEY_SHARED
ParallelApplyWorkerShared * MyParallelShared
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
#define InvalidBackendId
Definition: backendid.h:23
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:1066
dsm_segment * dsm_attach(dsm_handle h)
Definition: dsm.c:638
uint32 dsm_handle
Definition: dsm_impl.h:55
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
Assert(fmt[strlen(fmt) - 1] !='\n')
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:193
void pq_set_parallel_leader(pid_t pid, BackendId backend_id)
Definition: pqmq.c:78
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition: pqmq.c:53
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:291
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:225
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:207
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition: shm_toc.c:64
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
Definition: shm_mq.c:73

References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeApplyWorker(), invalidate_syncing_table_states(), InvalidBackendId, InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal(), ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), and SUBSCRIPTIONRELMAP.

Variable Documentation

◆ ParallelApplyMessagePending

PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending
extern