55 #define PARALLEL_ERROR_QUEUE_SIZE 16384
58 #define PARALLEL_MAGIC 0x50477c7c
65 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
66 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
67 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
68 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
69 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
70 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
71 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
72 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
73 #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
74 #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
75 #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
76 #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
77 #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
78 #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
79 #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
206 Size library_len = 0;
208 Size combocidlen = 0;
212 Size pendingsyncslen = 0;
214 Size relmapperlen = 0;
215 Size uncommittedenumslen = 0;
216 Size clientconninfolen = 0;
285 "parallel error queue size not buffer-aligned");
311 if (pcxt->
seg != NULL)
352 char *pendingsyncsspace;
354 char *relmapperspace;
355 char *error_queue_space;
356 char *session_dsm_handle_space;
357 char *entrypointstate;
358 char *uncommittedenumsspace;
359 char *clientconninfospace;
397 *(
dsm_handle *) session_dsm_handle_space = session_dsm_handle;
399 session_dsm_handle_space);
425 uncommittedenumslen);
428 uncommittedenumsspace);
434 clientconninfospace);
511 char *error_queue_space;
555 bool any_registrations_failed =
false;
571 memset(&worker, 0,
sizeof(worker));
596 if (!any_registrations_failed &&
615 any_registrations_failed =
true;
728 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 errmsg(
"parallel worker failed to initialize"),
730 errhint(
"More details may be available in the server log.")));
777 bool anyone_alive =
false;
847 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
848 errmsg(
"parallel worker failed to initialize"),
849 errhint(
"More details may be available in the server log.")));
868 if (pcxt->
toc != NULL)
909 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
910 errmsg(
"postmaster exited during a parallel transaction")));
959 if (pcxt->
seg != NULL)
1046 if (hpm_context == NULL)
1048 "HandleParallelMessages",
1064 if (pcxt->
worker == NULL)
1096 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1097 errmsg(
"lost connection to parallel worker")));
1163 _(
"parallel worker"));
1189 const char *channel;
1190 const char *payload;
1211 elog(
ERROR,
"unrecognized message type received from parallel worker: %c (message length %d bytes)",
1233 if (pcxt->
subid != mySubId)
1267 char *error_queue_space;
1271 char *entrypointstate;
1273 char *function_name;
1276 char *combocidspace;
1280 char *pendingsyncsspace;
1282 char *relmapperspace;
1283 char *uncommittedenumsspace;
1284 char *clientconninfospace;
1286 char *session_dsm_handle_space;
1318 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1319 errmsg(
"could not map dynamic shared memory segment")));
1323 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1324 errmsg(
"invalid magic number in dynamic shared memory segment")));
1342 mq = (
shm_mq *) (error_queue_space +
1393 library_name = entrypointstate;
1394 function_name = entrypointstate + strlen(library_name) + 1;
1432 session_dsm_handle_space =
1605 if (strcmp(libraryname,
"postgres") == 0)
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static const struct @13 InternalParallelWorkers[]
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
#define PARALLEL_KEY_TRANSACTION_STATE
void HandleParallelMessageInterrupt(void)
struct FixedParallelState FixedParallelState
bool InitializingParallelWorker
parallel_worker_main_type fn_addr
#define PARALLEL_KEY_UNCOMMITTEDENUMS
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void InitializeParallelDSM(ParallelContext *pcxt)
#define PARALLEL_KEY_CLIENTCONNINFO
static FixedParallelState * MyFixedParallelState
#define PARALLEL_KEY_PENDING_SYNCS
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void HandleParallelMessages(void)
void DestroyParallelContext(ParallelContext *pcxt)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
#define PARALLEL_KEY_ERROR_QUEUE
#define PARALLEL_KEY_SESSION_DSM
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
bool ParallelContextActive(void)
void ParallelWorkerMain(Datum main_arg)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
static pid_t ParallelLeaderPid
#define PARALLEL_KEY_REINDEX_STATE
#define PARALLEL_KEY_LIBRARY
static void ParallelWorkerShutdown(int code, Datum arg)
static dlist_head pcxt_list
#define PARALLEL_KEY_FIXED
#define PARALLEL_KEY_ENTRYPOINT
volatile sig_atomic_t ParallelMessagePending
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
#define PARALLEL_KEY_COMBO_CID
void WaitForParallelWorkersToAttach(ParallelContext *pcxt)
#define PARALLEL_ERROR_QUEUE_SIZE
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_RELMAPPER_STATE
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_CLASS_PARALLEL
@ BgWorkerStart_ConsistentState
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_SHMEM_ACCESS
#define StaticAssertStmt(condition, errmessage)
void RestoreComboCIDState(char *comboCIDstate)
void SerializeComboCIDState(Size maxsize, char *start_address)
Size EstimateComboCIDStateSpace(void)
static void PGresult * res
elog(ERROR, "%s: %s", p2, msg)
void RestoreLibraryState(char *start_address)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void SerializeLibraryState(Size maxsize, char *start_address)
Size EstimateLibraryStateSpace(void)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
dsm_segment * dsm_attach(dsm_handle h)
dsm_segment * dsm_create(Size size, int flags)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
#define DSM_HANDLE_INVALID
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
void ThrowErrorData(ErrorData *edata)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
volatile sig_atomic_t InterruptPending
BackendId ParallelLeaderBackendId
void RestoreGUCState(void *gucstate)
void SerializeGUCState(Size maxsize, char *start_address)
Size EstimateGUCStateSpace(void)
bool session_auth_is_superuser
const char * hba_authname(UserAuth auth_method)
#define dlist_foreach(iter, lhead)
#define dlist_head_element(type, membername, lhead)
static void dlist_delete(dlist_node *node)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void RestoreReindexState(void *reindexstate)
void SerializeReindexState(Size maxsize, char *start_address)
Size EstimateReindexStateSpace(void)
void InvalidateSystemCaches(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
#define pq_putmessage(msgtype, s, len)
Assert(fmt[strlen(fmt) - 1] !='\n')
int GetDatabaseEncoding(void)
int SetClientEncoding(int encoding)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void * MemoryContextAlloc(MemoryContext context, Size size)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
void SerializeClientConnectionInfo(Size maxsize, char *start_address)
void InitializeSystemUser(const char *authn_id, const char *auth_method)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
Size EstimateClientConnectionInfoSpace(void)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
Oid GetAuthenticatedUserId(void)
ClientConnectionInfo MyClientConnectionInfo
void RestoreClientConnectionInfo(char *conninfo)
Oid GetCurrentRoleId(void)
void SetUserIdAndSecContext(Oid userid, int sec_context)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void RestoreUncommittedEnums(void *space)
Size EstimateUncommittedEnumsSpace(void)
void SerializeUncommittedEnums(void *space, Size size)
pqsigfunc pqsignal(int signo, pqsigfunc func)
static uint32 DatumGetUInt32(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum UInt32GetDatum(uint32 X)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, BackendId backend_id)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void AttachSerializableXact(SerializableXactHandle handle)
SerializableXactHandle ShareSerializableXact(void)
void * SerializableXactHandle
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
@ PROCSIG_PARALLEL_MESSAGE
char * psprintf(const char *fmt,...)
Size EstimateRelationMapSpace(void)
void SerializeRelationMap(Size maxSize, char *startAddress)
void RestoreRelationMap(char *startAddress)
void AttachSession(dsm_handle handle)
dsm_handle GetSessionDsmHandle(void)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
PGPROC * shm_mq_get_sender(shm_mq *mq)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_toc * shm_toc_attach(uint64 magic, void *address)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
Snapshot RestoreSnapshot(char *start_address)
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
void PopActiveSnapshot(void)
Size EstimateSnapshotSpace(Snapshot snapshot)
Snapshot GetActiveSnapshot(void)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void BecomeLockGroupLeader(void)
void SerializePendingSyncs(Size maxSize, char *startAddress)
Size EstimatePendingSyncsSpace(void)
void RestorePendingSyncs(char *startAddress)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[BGW_MAXLEN]
Oid temp_toast_namespace_id
SerializableXactHandle serializable_xact_handle
PGPROC * parallel_leader_pgproc
pid_t parallel_leader_pid
BackendId parallel_leader_backend_id
Oid authenticated_user_id
bool * known_attached_workers
ErrorContextCallback * error_context_stack
shm_toc_estimator estimator
int nknown_attached_workers
ParallelWorkerInfo * worker
BackgroundWorkerHandle * bgwhandle
shm_mq_handle * error_mqh
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
@ WAIT_EVENT_BGWORKER_STARTUP
@ WAIT_EVENT_PARALLEL_FINISH
void SerializeTransactionState(Size maxsize, char *start_address)
void ExitParallelMode(void)
SubTransactionId GetCurrentSubTransactionId(void)
void EnterParallelMode(void)
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
void StartParallelWorkerTransaction(char *tstatespace)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
bool IsInParallelMode(void)
TimestampTz GetCurrentStatementStartTimestamp(void)
TimestampTz GetCurrentTransactionStartTimestamp(void)
void EndParallelWorkerTransaction(void)
void CommitTransactionCommand(void)
#define IsolationUsesXactSnapshot()
XLogRecPtr XactLastRecEnd