PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
logicalworker.h File Reference
#include <signal.h>
Include dependency graph for logicalworker.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
void ParallelApplyWorkerMain (Datum main_arg)
 
void TablesyncWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void HandleParallelApplyMessageInterrupt (void)
 
void ProcessParallelApplyMessages (void)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 

Variables

PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)

Definition at line 4796 of file worker.c.

4797{
4798 int worker_slot = DatumGetInt32(main_arg);
4799
4801
4802 SetupApplyOrSyncWorker(worker_slot);
4803
4805
4807
4808 proc_exit(0);
4809}
bool InitializingApplyWorker
Definition: worker.c:319
static void run_apply_worker()
Definition: worker.c:4543
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4742
void proc_exit(int code)
Definition: ipc.c:104
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207

References DatumGetInt32(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)

Definition at line 5125 of file worker.c.

5126{
5127 if (isCommit && on_commit_wakeup_workers_subids != NIL)
5128 {
5129 ListCell *lc;
5130
5131 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5133 {
5134 Oid subid = lfirst_oid(lc);
5135 List *workers;
5136 ListCell *lc2;
5137
5138 workers = logicalrep_workers_find(subid, true, false);
5139 foreach(lc2, workers)
5140 {
5141 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5142
5144 }
5145 }
5146 LWLockRelease(LogicalRepWorkerLock);
5147 }
5148
5149 /* The List storage will be reclaimed automatically in xact cleanup. */
5151}
static List * on_commit_wakeup_workers_subids
Definition: worker.c:302
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:266
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:693
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:115
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define lfirst_oid(lc)
Definition: pg_list.h:174
unsigned int Oid
Definition: postgres_ext.h:30
Definition: pg_list.h:54

References lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )

Definition at line 989 of file applyparallelworker.c.

990{
991 InterruptPending = true;
994}
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition: globals.c:32
struct Latch * MyLatch
Definition: globals.c:64
void SetLatch(Latch *latch)
Definition: latch.c:288

References InterruptPending, MyLatch, ParallelApplyMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )

Definition at line 4864 of file worker.c.

4865{
4867}
bool IsLogicalWorker(void)
Definition: worker.c:4855
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )

Definition at line 4855 of file worker.c.

4856{
4857 return MyLogicalRepWorker != NULL;
4858}
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54

References MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)

Definition at line 5111 of file worker.c.

5112{
5113 MemoryContext oldcxt;
5114
5118 MemoryContextSwitchTo(oldcxt);
5119}
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
MemoryContext TopTransactionContext
Definition: mcxt.c:170
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124

References list_append_unique_oid(), MemoryContextSwitchTo(), on_commit_wakeup_workers_subids, and TopTransactionContext.

Referenced by AlterObjectRename_internal(), AlterSubscription(), and AlterSubscriptionOwner_internal().

◆ ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg)

Definition at line 857 of file applyparallelworker.c.

858{
860 dsm_handle handle;
861 dsm_segment *seg;
862 shm_toc *toc;
863 shm_mq *mq;
864 shm_mq_handle *mqh;
865 shm_mq_handle *error_mqh;
866 RepOriginId originid;
867 int worker_slot = DatumGetInt32(main_arg);
868 char originname[NAMEDATALEN];
869
871
872 /* Setup signal handling. */
875 pqsignal(SIGTERM, die);
877
878 /*
879 * Attach to the dynamic shared memory segment for the parallel apply, and
880 * find its table of contents.
881 *
882 * Like parallel query, we don't need resource owner by this time. See
883 * ParallelWorkerMain.
884 */
885 memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
886 seg = dsm_attach(handle);
887 if (!seg)
889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
890 errmsg("could not map dynamic shared memory segment")));
891
893 if (!toc)
895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 errmsg("invalid magic number in dynamic shared memory segment")));
897
898 /* Look up the shared information. */
899 shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
900 MyParallelShared = shared;
901
902 /*
903 * Attach to the message queue.
904 */
905 mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
907 mqh = shm_mq_attach(mq, seg, NULL);
908
909 /*
910 * Primary initialization is complete. Now, we can attach to our slot.
911 * This is to ensure that the leader apply worker does not write data to
912 * the uninitialized memory queue.
913 */
914 logicalrep_worker_attach(worker_slot);
915
916 /*
917 * Register the shutdown callback after we are attached to the worker
918 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
919 * callback is invoked.
920 */
922
927
928 /*
929 * Attach to the error queue.
930 */
933 error_mqh = shm_mq_attach(mq, seg, NULL);
934
935 pq_redirect_to_shm_mq(seg, error_mqh);
938
941
943
945
946 /* Setup replication origin tracking. */
949 originname, sizeof(originname));
950 originid = replorigin_by_name(originname, false);
951
952 /*
953 * The parallel apply worker doesn't need to monopolize this replication
954 * origin which was already acquired by its leader process.
955 */
957 replorigin_session_origin = originid;
959
960 /*
961 * Setup callback for syscache so that we know when something changes in
962 * the subscription relation state.
963 */
964 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
966 (Datum) 0);
967
969
971
972 /*
973 * The parallel apply worker must not get here because the parallel apply
974 * worker will only stop when it receives a SIGTERM or SIGINT from the
975 * leader, or when there is an error. None of these cases will allow the
976 * code to reach here.
977 */
978 Assert(false);
979}
static void pa_shutdown(int code, Datum arg)
#define PARALLEL_APPLY_KEY_SHARED
ParallelApplyWorkerShared * MyParallelShared
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:426
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5157
void InitializeLogRepWorker(void)
Definition: worker.c:4648
Subscription * MySubscription
Definition: worker.c:299
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:926
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:1095
dsm_segment * dsm_attach(dsm_handle h)
Definition: dsm.c:665
uint32 dsm_handle
Definition: dsm_impl.h:55
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void logicalrep_worker_attach(int slot)
Definition: launcher.c:704
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1097
#define NAMEDATALEN
#define die(msg)
#define pqsignal
Definition: port.h:531
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
#define InvalidOid
Definition: postgres_ext.h:35
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:200
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition: pqmq.c:78
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition: pqmq.c:53
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:224
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:206
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:290
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition: shm_toc.c:64
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
PGPROC * MyProc
Definition: proc.c:67
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
Definition: shm_mq.c:72
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
#define SIGHUP
Definition: win32_port.h:158
void StartTransactionCommand(void)
Definition: xact.c:3059
void CommitTransactionCommand(void)
Definition: xact.c:3157
uint16 RepOriginId
Definition: xlogdefs.h:65

References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, invalidate_syncing_table_states(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SpinLockAcquire, SpinLockRelease, and StartTransactionCommand().

◆ ProcessParallelApplyMessages()

void ProcessParallelApplyMessages ( void  )

Definition at line 1063 of file applyparallelworker.c.

1064{
1065 ListCell *lc;
1066 MemoryContext oldcontext;
1067
1068 static MemoryContext hpam_context = NULL;
1069
1070 /*
1071 * This is invoked from ProcessInterrupts(), and since some of the
1072 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1073 * for recursive calls if more signals are received while this runs. It's
1074 * unclear that recursive entry would be safe, and it doesn't seem useful
1075 * even if it is safe, so let's block interrupts until done.
1076 */
1078
1079 /*
1080 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1081 * don't want to risk leaking data into long-lived contexts, so let's do
1082 * our work here in a private context that we can reset on each use.
1083 */
1084 if (!hpam_context) /* first time through? */
1086 "ProcessParallelApplyMessages",
1088 else
1089 MemoryContextReset(hpam_context);
1090
1091 oldcontext = MemoryContextSwitchTo(hpam_context);
1092
1094
1095 foreach(lc, ParallelApplyWorkerPool)
1096 {
1097 shm_mq_result res;
1098 Size nbytes;
1099 void *data;
1101
1102 /*
1103 * The leader will detach from the error queue and set it to NULL
1104 * before preparing to stop all parallel apply workers, so we don't
1105 * need to handle error messages anymore. See
1106 * logicalrep_worker_detach.
1107 */
1108 if (!winfo->error_mq_handle)
1109 continue;
1110
1111 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1112
1113 if (res == SHM_MQ_WOULD_BLOCK)
1114 continue;
1115 else if (res == SHM_MQ_SUCCESS)
1116 {
1117 StringInfoData msg;
1118
1119 initStringInfo(&msg);
1120 appendBinaryStringInfo(&msg, data, nbytes);
1122 pfree(msg.data);
1123 }
1124 else
1125 ereport(ERROR,
1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127 errmsg("lost connection to the logical replication parallel apply worker")));
1128 }
1129
1130 MemoryContextSwitchTo(oldcontext);
1131
1132 /* Might as well clear the context on our way out */
1133 MemoryContextReset(hpam_context);
1134
1136}
static List * ParallelApplyWorkerPool
static void ProcessParallelApplyMessage(StringInfo msg)
size_t Size
Definition: c.h:576
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
void pfree(void *pointer)
Definition: mcxt.c:2150
MemoryContext TopMemoryContext
Definition: mcxt.c:165
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
const void * data
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:281
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
shm_mq_handle * error_mq_handle

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ TablesyncWorkerMain()

void TablesyncWorkerMain ( Datum  main_arg)

Definition at line 1718 of file tablesync.c.

1719{
1720 int worker_slot = DatumGetInt32(main_arg);
1721
1722 SetupApplyOrSyncWorker(worker_slot);
1723
1725
1727}
static pg_noreturn void finish_sync_worker(void)
Definition: tablesync.c:143
static void run_tablesync_worker()
Definition: tablesync.c:1692

References DatumGetInt32(), finish_sync_worker(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

Variable Documentation

◆ ParallelApplyMessagePending

PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending
extern