54#define PARALLEL_ERROR_QUEUE_SIZE 16384
57#define PARALLEL_MAGIC 0x50477c7c
64#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
65#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
66#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
67#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
68#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
69#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
70#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
71#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
72#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
73#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
74#define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
75#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
76#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
77#define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
78#define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
210 Size library_len = 0;
212 Size combocidlen = 0;
216 Size pendingsyncslen = 0;
218 Size relmapperlen = 0;
219 Size uncommittedenumslen = 0;
220 Size clientconninfolen = 0;
298 "parallel error queue size not buffer-aligned");
324 if (pcxt->
seg != NULL)
367 char *pendingsyncsspace;
369 char *relmapperspace;
370 char *error_queue_space;
371 char *session_dsm_handle_space;
372 char *entrypointstate;
373 char *uncommittedenumsspace;
374 char *clientconninfospace;
412 *(
dsm_handle *) session_dsm_handle_space = session_dsm_handle;
414 session_dsm_handle_space);
440 uncommittedenumslen);
443 uncommittedenumsspace);
449 clientconninfospace);
529 char *error_queue_space;
574 bool any_registrations_failed =
false;
590 memset(&worker, 0,
sizeof(worker));
615 if (!any_registrations_failed &&
634 any_registrations_failed =
true;
747 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
748 errmsg(
"parallel worker failed to initialize"),
749 errhint(
"More details may be available in the server log.")));
764 -1, WAIT_EVENT_BGWORKER_STARTUP);
796 bool anyone_alive =
false;
866 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
867 errmsg(
"parallel worker failed to initialize"),
868 errhint(
"More details may be available in the server log.")));
883 WAIT_EVENT_PARALLEL_FINISH);
887 if (pcxt->
toc != NULL)
928 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
929 errmsg(
"postmaster exited during a parallel transaction")));
978 if (pcxt->
seg != NULL)
1004 if (pcxt->
worker != NULL)
1065 if (hpm_context == NULL)
1067 "HandleParallelMessages",
1083 if (pcxt->
worker == NULL)
1115 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1116 errmsg(
"lost connection to parallel worker")));
1172 _(
"parallel worker"));
1198 const char *channel;
1199 const char *payload;
1237 elog(
ERROR,
"unrecognized message type received from parallel worker: %c (message length %d bytes)",
1257 if (pcxt->
subid != mySubId)
1293 char *error_queue_space;
1297 char *entrypointstate;
1299 char *function_name;
1302 char *combocidspace;
1306 char *pendingsyncsspace;
1308 char *relmapperspace;
1309 char *uncommittedenumsspace;
1310 char *clientconninfospace;
1311 char *session_dsm_handle_space;
1343 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1344 errmsg(
"could not map dynamic shared memory segment")));
1348 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1349 errmsg(
"invalid magic number in dynamic shared memory segment")));
1367 mq = (
shm_mq *) (error_queue_space +
1406 library_name = entrypointstate;
1407 function_name = entrypointstate + strlen(library_name) + 1;
1474 session_dsm_handle_space =
1644 if (strcmp(libraryname,
"postgres") == 0)
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
#define PARALLEL_KEY_TRANSACTION_STATE
void HandleParallelMessageInterrupt(void)
struct FixedParallelState FixedParallelState
bool InitializingParallelWorker
parallel_worker_main_type fn_addr
#define PARALLEL_KEY_UNCOMMITTEDENUMS
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void InitializeParallelDSM(ParallelContext *pcxt)
#define PARALLEL_KEY_CLIENTCONNINFO
static FixedParallelState * MyFixedParallelState
#define PARALLEL_KEY_PENDING_SYNCS
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void HandleParallelMessages(void)
void DestroyParallelContext(ParallelContext *pcxt)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
#define PARALLEL_KEY_ERROR_QUEUE
#define PARALLEL_KEY_SESSION_DSM
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
bool ParallelContextActive(void)
void ParallelWorkerMain(Datum main_arg)
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
static pid_t ParallelLeaderPid
#define PARALLEL_KEY_REINDEX_STATE
#define PARALLEL_KEY_LIBRARY
static void ParallelWorkerShutdown(int code, Datum arg)
static dlist_head pcxt_list
#define PARALLEL_KEY_FIXED
#define PARALLEL_KEY_ENTRYPOINT
volatile sig_atomic_t ParallelMessagePending
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
#define PARALLEL_KEY_COMBO_CID
static const struct @16 InternalParallelWorkers[]
void WaitForParallelWorkersToAttach(ParallelContext *pcxt)
#define PARALLEL_ERROR_QUEUE_SIZE
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_RELMAPPER_STATE
void pgstat_progress_incr_param(int index, int64 incr)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_BYPASS_ROLELOGINCHECK
#define BGWORKER_CLASS_PARALLEL
@ BgWorkerStart_ConsistentState
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_BYPASS_ALLOWCONN
#define BGWORKER_SHMEM_ACCESS
void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
#define Assert(condition)
#define StaticAssertStmt(condition, errmessage)
void RestoreComboCIDState(char *comboCIDstate)
void SerializeComboCIDState(Size maxsize, char *start_address)
Size EstimateComboCIDStateSpace(void)
static void PGresult * res
void RestoreLibraryState(char *start_address)
void SerializeLibraryState(Size maxsize, char *start_address)
Size EstimateLibraryStateSpace(void)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
#define DSM_HANDLE_INVALID
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
void ThrowErrorData(ErrorData *edata)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ProcNumber ParallelLeaderProcNumber
volatile sig_atomic_t InterruptPending
void RestoreGUCState(void *gucstate)
void SerializeGUCState(Size maxsize, char *start_address)
Size EstimateGUCStateSpace(void)
bool current_role_is_superuser
const char * hba_authname(UserAuth auth_method)
#define dlist_foreach(iter, lhead)
#define dlist_head_element(type, membername, lhead)
static void dlist_delete(dlist_node *node)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void SerializeReindexState(Size maxsize, char *start_address)
void RestoreReindexState(const void *reindexstate)
Size EstimateReindexStateSpace(void)
void InvalidateSystemCaches(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
#define pq_putmessage(msgtype, s, len)
int GetDatabaseEncoding(void)
int SetClientEncoding(int encoding)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define INTERRUPTS_CAN_BE_PROCESSED()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
void SerializeClientConnectionInfo(Size maxsize, char *start_address)
void InitializeSystemUser(const char *authn_id, const char *auth_method)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
void SetSessionAuthorization(Oid userid, bool is_superuser)
bool GetSessionUserIsSuperuser(void)
Size EstimateClientConnectionInfoSpace(void)
Oid GetSessionUserId(void)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
Oid GetAuthenticatedUserId(void)
ClientConnectionInfo MyClientConnectionInfo
void RestoreClientConnectionInfo(char *conninfo)
void SetAuthenticatedUserId(Oid userid)
Oid GetCurrentRoleId(void)
void SetUserIdAndSecContext(Oid userid, int sec_context)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
void RestoreUncommittedEnums(void *space)
Size EstimateUncommittedEnumsSpace(void)
void SerializeUncommittedEnums(void *space, Size size)
pqsigfunc pqsignal(int signo, pqsigfunc func)
static uint32 DatumGetUInt32(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum UInt32GetDatum(uint32 X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void AttachSerializableXact(SerializableXactHandle handle)
SerializableXactHandle ShareSerializableXact(void)
void * SerializableXactHandle
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_MESSAGE
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
MemoryContextSwitchTo(old_ctx)
Size EstimateRelationMapSpace(void)
void SerializeRelationMap(Size maxSize, char *startAddress)
void RestoreRelationMap(char *startAddress)
void AttachSession(dsm_handle handle)
dsm_handle GetSessionDsmHandle(void)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
PGPROC * shm_mq_get_sender(shm_mq *mq)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
Snapshot RestoreSnapshot(char *start_address)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
void PopActiveSnapshot(void)
Size EstimateSnapshotSpace(Snapshot snapshot)
Snapshot GetActiveSnapshot(void)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void BecomeLockGroupLeader(void)
void SerializePendingSyncs(Size maxSize, char *startAddress)
Size EstimatePendingSyncsSpace(void)
void RestorePendingSyncs(char *startAddress)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
Oid temp_toast_namespace_id
SerializableXactHandle serializable_xact_handle
PGPROC * parallel_leader_pgproc
bool session_user_is_superuser
pid_t parallel_leader_pid
Oid authenticated_user_id
ProcNumber parallel_leader_proc_number
bool * known_attached_workers
ErrorContextCallback * error_context_stack
shm_toc_estimator estimator
int nknown_attached_workers
ParallelWorkerInfo * worker
BackgroundWorkerHandle * bgwhandle
shm_mq_handle * error_mqh
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
void SerializeTransactionState(Size maxsize, char *start_address)
void ExitParallelMode(void)
SubTransactionId GetCurrentSubTransactionId(void)
void EnterParallelMode(void)
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
void StartParallelWorkerTransaction(char *tstatespace)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
bool IsInParallelMode(void)
TimestampTz GetCurrentStatementStartTimestamp(void)
TimestampTz GetCurrentTransactionStartTimestamp(void)
void EndParallelWorkerTransaction(void)
void CommitTransactionCommand(void)
#define IsolationUsesXactSnapshot()
XLogRecPtr XactLastRecEnd