PostgreSQL Source Code git master
Loading...
Searching...
No Matches
logicalworker.h File Reference
#include <signal.h>
Include dependency graph for logicalworker.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyWorkerMain (Datum main_arg)
 
void ParallelApplyWorkerMain (Datum main_arg)
 
void TableSyncWorkerMain (Datum main_arg)
 
void SequenceSyncWorkerMain (Datum main_arg)
 
bool IsLogicalWorker (void)
 
bool IsLogicalParallelApplyWorker (void)
 
void HandleParallelApplyMessageInterrupt (void)
 
void ProcessParallelApplyMessages (void)
 
void LogicalRepWorkersWakeupAtCommit (Oid subid)
 
void AtEOXact_LogicalRepWorkers (bool isCommit)
 

Variables

PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending
 

Function Documentation

◆ ApplyWorkerMain()

void ApplyWorkerMain ( Datum  main_arg)
extern

Definition at line 5987 of file worker.c.

5988{
5990
5992
5994
5996
5998
5999 proc_exit(0);
6000}
static void run_apply_worker(void)
Definition worker.c:5666
bool InitializingApplyWorker
Definition worker.c:504
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5947
void proc_exit(int code)
Definition ipc.c:105
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
static int fb(int x)

References DatumGetInt32(), fb(), InitializingApplyWorker, proc_exit(), run_apply_worker(), and SetupApplyOrSyncWorker().

◆ AtEOXact_LogicalRepWorkers()

void AtEOXact_LogicalRepWorkers ( bool  isCommit)
extern

Definition at line 6332 of file worker.c.

6333{
6335 {
6336 ListCell *lc;
6337
6340 {
6341 Oid subid = lfirst_oid(lc);
6342 List *workers;
6343 ListCell *lc2;
6344
6345 workers = logicalrep_workers_find(subid, true, false);
6346 foreach(lc2, workers)
6347 {
6349
6351 }
6352 }
6354 }
6355
6356 /* The List storage will be reclaimed automatically in xact cleanup. */
6358}
static List * on_commit_wakeup_workers_subids
Definition worker.c:487
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:294
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:747
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1149
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1766
@ LW_SHARED
Definition lwlock.h:105
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define lfirst_oid(lc)
Definition pg_list.h:174
unsigned int Oid
Definition pg_list.h:54

References fb(), lfirst, lfirst_oid, logicalrep_worker_wakeup_ptr(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), NIL, and on_commit_wakeup_workers_subids.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )
extern

Definition at line 999 of file applyparallelworker.c.

1000{
1001 InterruptPending = true;
1003 /* latch will be set by procsignal_sigusr1_handler */
1004}
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition globals.c:32

References InterruptPending, and ParallelApplyMessagePending.

Referenced by procsignal_sigusr1_handler().

◆ IsLogicalParallelApplyWorker()

bool IsLogicalParallelApplyWorker ( void  )
extern

Definition at line 6074 of file worker.c.

6075{
6077}
bool IsLogicalWorker(void)
Definition worker.c:6065
static bool am_parallel_apply_worker(void)

References am_parallel_apply_worker(), and IsLogicalWorker().

Referenced by mq_putmessage().

◆ IsLogicalWorker()

bool IsLogicalWorker ( void  )
extern

Definition at line 6065 of file worker.c.

6066{
6067 return MyLogicalRepWorker != NULL;
6068}
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:57

References fb(), and MyLogicalRepWorker.

Referenced by IsLogicalParallelApplyWorker(), and ProcessInterrupts().

◆ LogicalRepWorkersWakeupAtCommit()

void LogicalRepWorkersWakeupAtCommit ( Oid  subid)
extern

Definition at line 6318 of file worker.c.

6319{
6321
6326}
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
MemoryContext TopTransactionContext
Definition mcxt.c:171
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124

References fb(), list_append_unique_oid(), MemoryContextSwitchTo(), on_commit_wakeup_workers_subids, and TopTransactionContext.

Referenced by AlterObjectRename_internal(), AlterSubscription(), and AlterSubscriptionOwner_internal().

◆ ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg)
extern

Definition at line 861 of file applyparallelworker.c.

862{
864 dsm_handle handle;
865 dsm_segment *seg;
866 shm_toc *toc;
867 shm_mq *mq;
869 shm_mq_handle *error_mqh;
873
875
876 /*
877 * Setup signal handling.
878 *
879 * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
880 * initiated by the leader apply worker. This helps to differentiate it
881 * from the case where we abort the current transaction and exit on
882 * receiving SIGTERM.
883 */
887
888 /*
889 * Attach to the dynamic shared memory segment for the parallel apply, and
890 * find its table of contents.
891 *
892 * Like parallel query, we don't need resource owner by this time. See
893 * ParallelWorkerMain.
894 */
895 memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
896 seg = dsm_attach(handle);
897 if (!seg)
900 errmsg("could not map dynamic shared memory segment")));
901
903 if (!toc)
906 errmsg("invalid magic number in dynamic shared memory segment")));
907
908 /* Look up the shared information. */
909 shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
910 MyParallelShared = shared;
911
912 /*
913 * Attach to the message queue.
914 */
917 mqh = shm_mq_attach(mq, seg, NULL);
918
919 /*
920 * Primary initialization is complete. Now, we can attach to our slot.
921 * This is to ensure that the leader apply worker does not write data to
922 * the uninitialized memory queue.
923 */
925
926 /*
927 * Register the shutdown callback after we are attached to the worker
928 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
929 * callback is invoked.
930 */
932
937
938 /*
939 * Attach to the error queue.
940 */
943 error_mqh = shm_mq_attach(mq, seg, NULL);
944
945 pq_redirect_to_shm_mq(seg, error_mqh);
948
951
953
955
956 /* Setup replication origin tracking. */
959 originname, sizeof(originname));
961
962 /*
963 * The parallel apply worker doesn't need to monopolize this replication
964 * origin which was already acquired by its leader process.
965 */
969
970 /*
971 * Setup callback for syscache so that we know when something changes in
972 * the subscription relation state.
973 */
976 (Datum) 0);
977
979
981
982 /*
983 * The parallel apply worker must not get here because the parallel apply
984 * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
985 * leader, or SIGINT from itself, or when there is an error. None of these
986 * cases will allow the code to reach here.
987 */
988 Assert(false);
989}
static void pa_shutdown(int code, Datum arg)
#define PARALLEL_APPLY_KEY_SHARED
ParallelApplyWorkerShared * MyParallelShared
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
void set_apply_error_context_origin(char *originname)
Definition worker.c:6364
void InitializeLogRepWorker(void)
Definition worker.c:5779
Subscription * MySubscription
Definition worker.c:484
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:935
#define Assert(condition)
Definition c.h:943
void * dsm_segment_address(dsm_segment *seg)
Definition dsm.c:1095
dsm_segment * dsm_attach(dsm_handle h)
Definition dsm.c:665
uint32 dsm_handle
Definition dsm_impl.h:55
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define ereport(elevel,...)
Definition elog.h:151
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
Definition inval.c:1816
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:344
void logicalrep_worker_attach(int slot)
Definition launcher.c:758
static char * errmsg
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147
#define NAMEDATALEN
#define pqsignal
Definition port.h:547
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
BackgroundWorker * MyBgworkerEntry
Definition postmaster.c:200
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition pqmq.c:84
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition pqmq.c:55
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:226
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition shm_mq.c:208
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition shm_mq.c:292
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition shm_toc.c:232
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition shm_toc.c:64
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:69
char bgw_extra[BGW_EXTRALEN]
Definition bgworker.h:106
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
ReplOriginId origin
Definition origin.h:45
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
#define SIGHUP
Definition win32_port.h:158
#define SIGUSR2
Definition win32_port.h:171
void StartTransactionCommand(void)
Definition xact.c:3081
void CommitTransactionCommand(void)
Definition xact.c:3179
uint16 ReplOriginId
Definition xlogdefs.h:69

References Assert, BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg, ERROR, fb(), LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, InvalidateSyncingRelStates(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, ReplOriginXactState::origin, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_setup(), replorigin_xact_state, LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGUSR2, SpinLockAcquire(), SpinLockRelease(), and StartTransactionCommand().

◆ ProcessParallelApplyMessages()

void ProcessParallelApplyMessages ( void  )
extern

Definition at line 1073 of file applyparallelworker.c.

1074{
1075 ListCell *lc;
1076 MemoryContext oldcontext;
1077
1079
1080 /*
1081 * This is invoked from ProcessInterrupts(), and since some of the
1082 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1083 * for recursive calls if more signals are received while this runs. It's
1084 * unclear that recursive entry would be safe, and it doesn't seem useful
1085 * even if it is safe, so let's block interrupts until done.
1086 */
1088
1089 /*
1090 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1091 * don't want to risk leaking data into long-lived contexts, so let's do
1092 * our work here in a private context that we can reset on each use.
1093 */
1094 if (!hpam_context) /* first time through? */
1096 "ProcessParallelApplyMessages",
1098 else
1100
1101 oldcontext = MemoryContextSwitchTo(hpam_context);
1102
1104
1105 foreach(lc, ParallelApplyWorkerPool)
1106 {
1107 shm_mq_result res;
1108 Size nbytes;
1109 void *data;
1111
1112 /*
1113 * The leader will detach from the error queue and set it to NULL
1114 * before preparing to stop all parallel apply workers, so we don't
1115 * need to handle error messages anymore. See
1116 * logicalrep_worker_detach.
1117 */
1118 if (!winfo->error_mq_handle)
1119 continue;
1120
1121 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1122
1123 if (res == SHM_MQ_WOULD_BLOCK)
1124 continue;
1125 else if (res == SHM_MQ_SUCCESS)
1126 {
1127 StringInfoData msg;
1128
1129 initStringInfo(&msg);
1130 appendBinaryStringInfo(&msg, data, nbytes);
1132 pfree(msg.data);
1133 }
1134 else
1135 ereport(ERROR,
1137 errmsg("lost connection to the logical replication parallel apply worker")));
1138 }
1139
1140 MemoryContextSwitchTo(oldcontext);
1141
1142 /* Might as well clear the context on our way out */
1144
1146}
static List * ParallelApplyWorkerPool
static void ProcessParallelApplyMessage(StringInfo msg)
size_t Size
Definition c.h:689
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
void pfree(void *pointer)
Definition mcxt.c:1616
MemoryContext TopMemoryContext
Definition mcxt.c:166
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
const void * data
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition shm_mq.c:574
shm_mq_result
Definition shm_mq.h:39
@ SHM_MQ_SUCCESS
Definition shm_mq.h:40
@ SHM_MQ_WOULD_BLOCK
Definition shm_mq.h:41
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
shm_mq_handle * error_mq_handle

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg, ERROR, ParallelApplyWorkerInfo::error_mq_handle, fb(), HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

◆ SequenceSyncWorkerMain()

void SequenceSyncWorkerMain ( Datum  main_arg)
extern

Definition at line 764 of file sequencesync.c.

765{
767
769
771
773}
static void start_sequence_sync(void)
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50

References DatumGetInt32(), fb(), FinishSyncWorker(), SetupApplyOrSyncWorker(), and start_sequence_sync().

◆ TableSyncWorkerMain()

void TableSyncWorkerMain ( Datum  main_arg)
extern

Definition at line 1577 of file tablesync.c.

1578{
1580
1582
1584
1586}
static void run_tablesync_worker(void)
Definition tablesync.c:1551

References DatumGetInt32(), fb(), FinishSyncWorker(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

Variable Documentation

◆ ParallelApplyMessagePending