54 #define PARALLEL_ERROR_QUEUE_SIZE 16384
57 #define PARALLEL_MAGIC 0x50477c7c
64 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
65 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
66 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
67 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
68 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
69 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
70 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
71 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
72 #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
73 #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
74 #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
75 #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
76 #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
77 #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
78 #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
208 Size library_len = 0;
210 Size combocidlen = 0;
214 Size pendingsyncslen = 0;
216 Size relmapperlen = 0;
217 Size uncommittedenumslen = 0;
218 Size clientconninfolen = 0;
287 "parallel error queue size not buffer-aligned");
313 if (pcxt->
seg != NULL)
354 char *pendingsyncsspace;
356 char *relmapperspace;
357 char *error_queue_space;
358 char *session_dsm_handle_space;
359 char *entrypointstate;
360 char *uncommittedenumsspace;
361 char *clientconninfospace;
399 *(
dsm_handle *) session_dsm_handle_space = session_dsm_handle;
401 session_dsm_handle_space);
427 uncommittedenumslen);
430 uncommittedenumsspace);
436 clientconninfospace);
513 char *error_queue_space;
557 bool any_registrations_failed =
false;
573 memset(&worker, 0,
sizeof(worker));
598 if (!any_registrations_failed &&
617 any_registrations_failed =
true;
730 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
731 errmsg(
"parallel worker failed to initialize"),
732 errhint(
"More details may be available in the server log.")));
747 -1, WAIT_EVENT_BGWORKER_STARTUP);
779 bool anyone_alive =
false;
849 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
850 errmsg(
"parallel worker failed to initialize"),
851 errhint(
"More details may be available in the server log.")));
866 WAIT_EVENT_PARALLEL_FINISH);
870 if (pcxt->
toc != NULL)
911 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
912 errmsg(
"postmaster exited during a parallel transaction")));
961 if (pcxt->
seg != NULL)
1048 if (hpm_context == NULL)
1050 "HandleParallelMessages",
1066 if (pcxt->
worker == NULL)
1098 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1099 errmsg(
"lost connection to parallel worker")));
1155 _(
"parallel worker"));
1181 const char *channel;
1182 const char *payload;
1220 elog(
ERROR,
"unrecognized message type received from parallel worker: %c (message length %d bytes)",
1240 if (pcxt->
subid != mySubId)
1276 char *error_queue_space;
1280 char *entrypointstate;
1282 char *function_name;
1285 char *combocidspace;
1289 char *pendingsyncsspace;
1291 char *relmapperspace;
1292 char *uncommittedenumsspace;
1293 char *clientconninfospace;
1294 char *session_dsm_handle_space;
1326 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1327 errmsg(
"could not map dynamic shared memory segment")));
1331 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1332 errmsg(
"invalid magic number in dynamic shared memory segment")));
1350 mq = (
shm_mq *) (error_queue_space +
1389 library_name = entrypointstate;
1390 function_name = entrypointstate + strlen(library_name) + 1;
1428 session_dsm_handle_space =
1609 if (strcmp(libraryname,
"postgres") == 0)
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
#define PARALLEL_KEY_TRANSACTION_STATE
void HandleParallelMessageInterrupt(void)
struct FixedParallelState FixedParallelState
bool InitializingParallelWorker
parallel_worker_main_type fn_addr
#define PARALLEL_KEY_UNCOMMITTEDENUMS
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void InitializeParallelDSM(ParallelContext *pcxt)
#define PARALLEL_KEY_CLIENTCONNINFO
static FixedParallelState * MyFixedParallelState
#define PARALLEL_KEY_PENDING_SYNCS
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void HandleParallelMessages(void)
void DestroyParallelContext(ParallelContext *pcxt)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
#define PARALLEL_KEY_ERROR_QUEUE
#define PARALLEL_KEY_SESSION_DSM
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
bool ParallelContextActive(void)
void ParallelWorkerMain(Datum main_arg)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
static pid_t ParallelLeaderPid
#define PARALLEL_KEY_REINDEX_STATE
#define PARALLEL_KEY_LIBRARY
static void ParallelWorkerShutdown(int code, Datum arg)
static dlist_head pcxt_list
#define PARALLEL_KEY_FIXED
#define PARALLEL_KEY_ENTRYPOINT
volatile sig_atomic_t ParallelMessagePending
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
#define PARALLEL_KEY_COMBO_CID
static const struct @16 InternalParallelWorkers[]
void WaitForParallelWorkersToAttach(ParallelContext *pcxt)
#define PARALLEL_ERROR_QUEUE_SIZE
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_RELMAPPER_STATE
void pgstat_progress_incr_param(int index, int64 incr)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_CLASS_PARALLEL
@ BgWorkerStart_ConsistentState
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_SHMEM_ACCESS
void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
#define Assert(condition)
#define StaticAssertStmt(condition, errmessage)
void RestoreComboCIDState(char *comboCIDstate)
void SerializeComboCIDState(Size maxsize, char *start_address)
Size EstimateComboCIDStateSpace(void)
static void PGresult * res
void RestoreLibraryState(char *start_address)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void SerializeLibraryState(Size maxsize, char *start_address)
Size EstimateLibraryStateSpace(void)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
dsm_segment * dsm_attach(dsm_handle h)
dsm_segment * dsm_create(Size size, int flags)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
#define DSM_HANDLE_INVALID
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
void ThrowErrorData(ErrorData *edata)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ProcNumber ParallelLeaderProcNumber
volatile sig_atomic_t InterruptPending
void RestoreGUCState(void *gucstate)
void SerializeGUCState(Size maxsize, char *start_address)
Size EstimateGUCStateSpace(void)
bool current_role_is_superuser
const char * hba_authname(UserAuth auth_method)
#define dlist_foreach(iter, lhead)
#define dlist_head_element(type, membername, lhead)
static void dlist_delete(dlist_node *node)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void SerializeReindexState(Size maxsize, char *start_address)
void RestoreReindexState(const void *reindexstate)
Size EstimateReindexStateSpace(void)
void InvalidateSystemCaches(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
#define pq_putmessage(msgtype, s, len)
int GetDatabaseEncoding(void)
int SetClientEncoding(int encoding)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
void SerializeClientConnectionInfo(Size maxsize, char *start_address)
void InitializeSystemUser(const char *authn_id, const char *auth_method)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Size EstimateClientConnectionInfoSpace(void)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
Oid GetAuthenticatedUserId(void)
ClientConnectionInfo MyClientConnectionInfo
void RestoreClientConnectionInfo(char *conninfo)
Oid GetCurrentRoleId(void)
void SetUserIdAndSecContext(Oid userid, int sec_context)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
void RestoreUncommittedEnums(void *space)
Size EstimateUncommittedEnumsSpace(void)
void SerializeUncommittedEnums(void *space, Size size)
pqsigfunc pqsignal(int signo, pqsigfunc func)
static uint32 DatumGetUInt32(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum UInt32GetDatum(uint32 X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void AttachSerializableXact(SerializableXactHandle handle)
SerializableXactHandle ShareSerializableXact(void)
void * SerializableXactHandle
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_MESSAGE
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
MemoryContextSwitchTo(old_ctx)
Size EstimateRelationMapSpace(void)
void SerializeRelationMap(Size maxSize, char *startAddress)
void RestoreRelationMap(char *startAddress)
void AttachSession(dsm_handle handle)
dsm_handle GetSessionDsmHandle(void)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
PGPROC * shm_mq_get_sender(shm_mq *mq)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_toc * shm_toc_attach(uint64 magic, void *address)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
Snapshot RestoreSnapshot(char *start_address)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
void PopActiveSnapshot(void)
Size EstimateSnapshotSpace(Snapshot snapshot)
Snapshot GetActiveSnapshot(void)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void BecomeLockGroupLeader(void)
void SerializePendingSyncs(Size maxSize, char *startAddress)
Size EstimatePendingSyncsSpace(void)
void RestorePendingSyncs(char *startAddress)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
Oid temp_toast_namespace_id
SerializableXactHandle serializable_xact_handle
PGPROC * parallel_leader_pgproc
pid_t parallel_leader_pid
Oid authenticated_user_id
ProcNumber parallel_leader_proc_number
bool * known_attached_workers
ErrorContextCallback * error_context_stack
shm_toc_estimator estimator
int nknown_attached_workers
ParallelWorkerInfo * worker
BackgroundWorkerHandle * bgwhandle
shm_mq_handle * error_mqh
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
void SerializeTransactionState(Size maxsize, char *start_address)
void ExitParallelMode(void)
SubTransactionId GetCurrentSubTransactionId(void)
void EnterParallelMode(void)
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
void StartParallelWorkerTransaction(char *tstatespace)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
bool IsInParallelMode(void)
TimestampTz GetCurrentStatementStartTimestamp(void)
TimestampTz GetCurrentTransactionStartTimestamp(void)
void EndParallelWorkerTransaction(void)
void CommitTransactionCommand(void)
#define IsolationUsesXactSnapshot()
XLogRecPtr XactLastRecEnd