53 #define PARALLEL_ERROR_QUEUE_SIZE 16384 56 #define PARALLEL_MAGIC 0x50477c7c 63 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) 64 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) 65 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) 66 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) 67 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) 68 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) 69 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) 70 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) 71 #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) 72 #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) 73 #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B) 74 #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C) 75 #define PARALLEL_KEY_ENUMBLACKLIST UINT64CONST(0xFFFFFFFFFFFF000D) 198 Size library_len = 0;
200 Size combocidlen = 0;
205 Size relmapperlen = 0;
206 Size enumblacklistlen = 0;
268 "parallel error queue size not buffer-aligned");
294 if (pcxt->
seg != NULL)
336 char *relmapperspace;
337 char *error_queue_space;
338 char *session_dsm_handle_space;
339 char *entrypointstate;
340 char *enumblacklistspace;
370 *(
dsm_handle *) session_dsm_handle_space = session_dsm_handle;
372 session_dsm_handle_space);
471 char *error_queue_space;
498 bool any_registrations_failed =
false;
514 memset(&worker, 0,
sizeof(worker));
538 memcpy(worker.
bgw_extra, &i,
sizeof(
int));
539 if (!any_registrations_failed &&
558 any_registrations_failed =
true;
671 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
672 errmsg(
"parallel worker failed to initialize"),
673 errhint(
"More details may be available in the server log.")));
720 bool anyone_alive =
false;
790 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
791 errmsg(
"parallel worker failed to initialize"),
792 errhint(
"More details may be available in the server log.")));
811 if (pcxt->
toc != NULL)
852 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
853 errmsg(
"postmaster exited during a parallel transaction")));
902 if (pcxt->
seg != NULL)
989 if (hpm_context == NULL)
991 "HandleParallelMessages",
1007 if (pcxt->
worker == NULL)
1039 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040 errmsg(
"lost connection to parallel worker")));
1106 _(
"parallel worker"));
1132 const char *channel;
1133 const char *payload;
1154 elog(
ERROR,
"unrecognized message type received from parallel worker: %c (message length %d bytes)",
1176 if (pcxt->
subid != mySubId)
1210 char *error_queue_space;
1214 char *entrypointstate;
1216 char *function_name;
1219 char *combocidspace;
1224 char *relmapperspace;
1225 char *enumblacklistspace;
1227 char *session_dsm_handle_space;
1257 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1258 errmsg(
"could not map dynamic shared memory segment")));
1262 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1263 errmsg(
"invalid magic number in dynamic shared memory segment")));
1267 MyFixedParallelState = fps;
1281 mq = (
shm_mq *) (error_queue_space +
1332 library_name = entrypointstate;
1333 function_name = entrypointstate + strlen(library_name) + 1;
1371 session_dsm_handle_space =
1503 if (strcmp(libraryname,
"postgres") == 0)
1514 elog(
ERROR,
"internal function \"%s\" not found", funcname);
char bgw_extra[BGW_EXTRALEN]
#define DatumGetUInt32(X)
void SerializeEnumBlacklist(void *space, Size size)
#define PARALLEL_ERROR_QUEUE_SIZE
static const struct @21 InternalParallelWorkers[]
#define AllocSetContextCreate
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
int errhint(const char *fmt,...)
Snapshot RestoreSnapshot(char *start_address)
MemoryContext TopTransactionContext
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
void SetUserIdAndSecContext(Oid userid, int sec_context)
static void dlist_push_head(dlist_head *head, dlist_node *node)
XLogRecPtr XactLastRecEnd
void AttachSerializableXact(SerializableXactHandle handle)
void shm_mq_detach(shm_mq_handle *mqh)
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
#define dlist_foreach(iter, lhead)
void SerializeReindexState(Size maxsize, char *start_address)
char * pstrdup(const char *in)
void CommitTransactionCommand(void)
shm_toc_estimator estimator
char * psprintf(const char *fmt,...)
void EndParallelWorkerTransaction(void)
#define SpinLockInit(lock)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
dsm_segment * dsm_attach(dsm_handle h)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
PGPROC * shm_mq_get_sender(shm_mq *mq)
Oid authenticated_user_id
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Snapshot GetActiveSnapshot(void)
#define PARALLEL_KEY_ENUMBLACKLIST
dsm_handle dsm_segment_handle(dsm_segment *seg)
int errcode(int sqlerrcode)
PGPROC * parallel_master_pgproc
Oid temp_toast_namespace_id
#define BGWORKER_CLASS_PARALLEL
BackgroundWorker * MyBgworkerEntry
void MemoryContextReset(MemoryContext context)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void PopActiveSnapshot(void)
int nknown_attached_workers
#define PARALLEL_KEY_RELMAPPER_STATE
Size shm_toc_estimate(shm_toc_estimator *e)
void SerializeTransactionState(Size maxsize, char *start_address)
parallel_worker_main_type fn_addr
#define shm_toc_estimate_chunk(e, sz)
void SetLatch(Latch *latch)
#define BGWORKER_SHMEM_ACCESS
Snapshot GetTransactionSnapshot(void)
void InvalidateSystemCaches(void)
char bgw_function_name[BGW_MAXLEN]
static pid_t ParallelMasterPid
void ResetLatch(Latch *latch)
void RestoreComboCIDState(char *comboCIDstate)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Oid GetCurrentRoleId(void)
#define RESUME_INTERRUPTS()
ErrorContextCallback * error_context_stack
SerializableXactHandle serializable_xact_handle
volatile bool ParallelMessagePending
#define DSM_HANDLE_INVALID
#define StaticAssertStmt(condition, errmessage)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
#define SpinLockAcquire(lock)
void DestroyParallelContext(ParallelContext *pcxt)
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
#define dlist_container(type, membername, ptr)
ParallelWorkerInfo * worker
void pfree(void *pointer)
bool IsInParallelMode(void)
void SerializeLibraryState(Size maxsize, char *start_address)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
Oid GetAuthenticatedUserId(void)
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void SerializeSnapshot(Snapshot snapshot, char *start_address)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
shm_mq * shm_mq_create(void *address, Size size)
void ExitParallelMode(void)
#define ALLOCSET_DEFAULT_SIZES
#define PARALLEL_KEY_FIXED
void HandleParallelMessages(void)
#define PARALLEL_KEY_ERROR_QUEUE
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
void PushActiveSnapshot(Snapshot snap)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
shm_mq_handle * error_mqh
int SetClientEncoding(int encoding)
BackgroundWorkerHandle * bgwhandle
void SerializeRelationMap(Size maxSize, char *startAddress)
Size EstimateGUCStateSpace(void)
void AttachSession(dsm_handle handle)
#define BGW_NEVER_RESTART
#define shm_toc_initialize_estimator(e)
Size EstimateComboCIDStateSpace(void)
#define UInt32GetDatum(X)
MemoryContext CurrentMemoryContext
static void ParallelWorkerShutdown(int code, Datum arg)
static void dlist_delete(dlist_node *node)
Size EstimateEnumBlacklistSpace(void)
PGFunction load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void LaunchParallelWorkers(ParallelContext *pcxt)
Size EstimateReindexStateSpace(void)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void BecomeLockGroupLeader(void)
#define ereport(elevel, rest)
MemoryContext TopMemoryContext
TimestampTz GetCurrentTransactionStartTimestamp(void)
void ThrowErrorData(ErrorData *edata)
void initStringInfo(StringInfo str)
#define DLIST_STATIC_INIT(name)
void InitializeParallelDSM(ParallelContext *pcxt)
bool * known_attached_workers
bool ParallelContextActive(void)
#define SpinLockRelease(lock)
#define dlist_head_element(type, membername, lhead)
Size EstimateSnapshotSpace(Snapshot snap)
Size mul_size(Size s1, Size s2)
void * palloc0(Size size)
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
void RestoreLibraryState(char *start_address)
void RestoreEnumBlacklist(void *space)
dsm_segment * dsm_create(Size size, int flags)
shm_toc * shm_toc_attach(uint64 magic, void *address)
int GetDatabaseEncoding(void)
pid_t parallel_master_pid
Size EstimateLibraryStateSpace(void)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void * SerializableXactHandle
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
BackendId parallel_master_backend_id
void * dsm_segment_address(dsm_segment *seg)
char bgw_name[BGW_MAXLEN]
#define Assert(condition)
BackendId ParallelMasterBackendId
void StartParallelWorkerTransaction(char *tstatespace)
#define BGWORKER_BACKEND_DATABASE_CONNECTION
SubTransactionId GetCurrentSubTransactionId(void)
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
#define PARALLEL_KEY_REINDEX_STATE
static bool dlist_is_empty(dlist_head *head)
BgWorkerStartTime bgw_start_time
#define shm_toc_estimate_keys(e, cnt)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
void EnterParallelMode(void)
volatile sig_atomic_t InterruptPending
Size EstimateRelationMapSpace(void)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
ErrorContextCallback * error_context_stack
void pq_set_parallel_master(pid_t pid, BackendId backend_id)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
#define PARALLEL_KEY_TRANSACTION_STATE
char bgw_type[BGW_MAXLEN]
void dsm_detach(dsm_segment *seg)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void RestoreReindexState(void *reindexstate)
int errmsg(const char *fmt,...)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void ParallelWorkerMain(Datum main_arg)
static FixedParallelState * MyFixedParallelState
void * MemoryContextAlloc(MemoryContext context, Size size)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
#define HOLD_INTERRUPTS()
bool InitializingParallelWorker
void RestoreRelationMap(char *startAddress)
#define PARALLEL_KEY_SESSION_DSM
void HandleParallelMessageInterrupt(void)
#define CHECK_FOR_INTERRUPTS()
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
static dlist_head pcxt_list
#define pq_putmessage(msgtype, s, len)
static void static void status(const char *fmt,...) pg_attribute_printf(1
struct FixedParallelState FixedParallelState
dsm_handle GetSessionDsmHandle(void)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
char bgw_library_name[BGW_MAXLEN]
void WaitForParallelWorkersToAttach(ParallelContext *pcxt)
bool session_auth_is_superuser
SerializableXactHandle ShareSerializableXact(void)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_ENTRYPOINT
#define PARALLEL_KEY_COMBO_CID
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void SerializeGUCState(Size maxsize, char *start_address)
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
void SerializeComboCIDState(Size maxsize, char *start_address)
#define PARALLEL_KEY_LIBRARY
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
void RestoreGUCState(void *gucstate)
#define WL_EXIT_ON_PM_DEATH
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
TimestampTz GetCurrentStatementStartTimestamp(void)
void BackgroundWorkerUnblockSignals(void)