PostgreSQL Source Code git master
Loading...
Searching...
No Matches
worker_spi.c
Go to the documentation of this file.
1/* -------------------------------------------------------------------------
2 *
3 * worker_spi.c
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
9 *
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
15 *
16 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
17 *
18 * IDENTIFICATION
19 * src/test/modules/worker_spi/worker_spi.c
20 *
21 * -------------------------------------------------------------------------
22 */
23#include "postgres.h"
24
25/* These are always necessary for a bgworker */
26#include "miscadmin.h"
27#include "postmaster/bgworker.h"
29#include "storage/latch.h"
30
31/* these headers are used by this particular worker's code */
32#include "access/xact.h"
33#include "catalog/pg_database.h"
34#include "executor/spi.h"
35#include "fmgr.h"
36#include "lib/stringinfo.h"
37#include "pgstat.h"
38#include "tcop/utility.h"
39#include "utils/acl.h"
40#include "utils/builtins.h"
41#include "utils/snapmgr.h"
42#include "utils/wait_event.h"
43
45
47
49
50/* GUC variables */
51static int worker_spi_naptime = 10;
54static char *worker_spi_role = NULL;
55
56/* value cached, fetched from shared memory */
58
59typedef struct worktable
60{
61 const char *schema;
62 const char *name;
64
65/*
66 * Initialize workspace for a worker process: create the schema if it doesn't
67 * already exist.
68 */
69static void
71{
72 int ret;
73 int ntup;
74 bool isnull;
76
81 pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
82
83 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
85 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
86 table->schema);
87
89 ret = SPI_execute(buf.data, true, 0);
90 if (ret != SPI_OK_SELECT)
91 elog(FATAL, "SPI_execute failed: error code %d", ret);
92
93 if (SPI_processed != 1)
94 elog(FATAL, "not a singleton result");
95
98 1, &isnull));
99 if (isnull)
100 elog(FATAL, "null result");
101
102 if (ntup == 0)
103 {
107 "CREATE SCHEMA \"%s\" "
108 "CREATE TABLE \"%s\" ("
109 " type text CHECK (type IN ('total', 'delta')), "
110 " value integer)"
111 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
112 "WHERE type = 'total'",
113 table->schema, table->name, table->name, table->name);
114
115 /* set statement start time */
117
118 debug_query_string = buf.data;
119 ret = SPI_execute(buf.data, false, 0);
120
121 if (ret != SPI_OK_UTILITY)
122 elog(FATAL, "failed to create my schema");
123
124 debug_query_string = NULL; /* rest is not statement-specific */
125 }
126
127 SPI_finish();
132}
133
134void
136{
140 char name[20];
141 Oid dboid;
142 Oid roleoid;
143 char *p;
144 bits32 flags = 0;
145
147 sprintf(name, "schema%d", index);
148 table->schema = pstrdup(name);
149 table->name = pstrdup("counted");
150
151 /* fetch database and role OIDs, these are set for a dynamic worker */
153 memcpy(&dboid, p, sizeof(Oid));
154 p += sizeof(Oid);
155 memcpy(&roleoid, p, sizeof(Oid));
156 p += sizeof(Oid);
157 memcpy(&flags, p, sizeof(bits32));
158
159 /* Establish signal handlers before unblocking signals. */
162
163 /* We're now ready to receive signals */
165
166 /* Connect to our database */
167 if (OidIsValid(dboid))
168 BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
169 else
171 worker_spi_role, flags);
172
173 elog(LOG, "%s initialized with %s.%s",
174 MyBgworkerEntry->bgw_name, table->schema, table->name);
176
177 /*
178 * Quote identifiers passed to us. Note that this must be done after
179 * initialize_worker_spi, because that routine assumes the names are not
180 * quoted.
181 *
182 * Note some memory might be leaked here.
183 */
184 table->schema = quote_identifier(table->schema);
185 table->name = quote_identifier(table->name);
186
189 "WITH deleted AS (DELETE "
190 "FROM %s.%s "
191 "WHERE type = 'delta' RETURNING value), "
192 "total AS (SELECT coalesce(sum(value), 0) as sum "
193 "FROM deleted) "
194 "UPDATE %s.%s "
195 "SET value = %s.value + total.sum "
196 "FROM total WHERE type = 'total' "
197 "RETURNING %s.value",
198 table->schema, table->name,
199 table->schema, table->name,
200 table->name,
201 table->name);
202
203 /*
204 * Main loop: do this until SIGTERM is received and processed by
205 * ProcessInterrupts.
206 */
207 for (;;)
208 {
209 int ret;
210
211 /* First time, allocate or get the custom wait event */
214
215 /*
216 * Background workers mustn't call usleep() or any direct equivalent:
217 * instead, they may wait on their process latch, which sleeps as
218 * necessary, but is awakened if postmaster dies. That way the
219 * background process goes away immediately in an emergency.
220 */
223 worker_spi_naptime * 1000L,
226
228
229 /*
230 * In case of a SIGHUP, just reload the configuration.
231 */
233 {
234 ConfigReloadPending = false;
236 }
237
238 /*
239 * Start a transaction on which we can run queries. Note that each
240 * StartTransactionCommand() call should be preceded by a
241 * SetCurrentStatementStartTimestamp() call, which sets both the time
242 * for the statement we're about the run, and also the transaction
243 * start time. Also, each other query sent to SPI should probably be
244 * preceded by SetCurrentStatementStartTimestamp(), so that statement
245 * start time is always up to date.
246 *
247 * The SPI_connect() call lets us run queries through the SPI manager,
248 * and the PushActiveSnapshot() call creates an "active" snapshot
249 * which is necessary for queries to have MVCC data to work on.
250 *
251 * The pgstat_report_activity() call makes our activity visible
252 * through the pgstat views.
253 */
256 SPI_connect();
258 debug_query_string = buf.data;
260
261 /* We can now execute queries via SPI */
262 ret = SPI_execute(buf.data, false, 0);
263
264 if (ret != SPI_OK_UPDATE_RETURNING)
265 elog(FATAL, "cannot select from table %s.%s: error code %d",
266 table->schema, table->name, ret);
267
268 if (SPI_processed > 0)
269 {
270 bool isnull;
271 int32 val;
272
275 1, &isnull));
276 if (!isnull)
277 elog(LOG, "%s: count in %s.%s is now %d",
279 table->schema, table->name, val);
280 }
281
282 /*
283 * And finish our transaction.
284 */
285 SPI_finish();
289 pgstat_report_stat(true);
291 }
292
293 /* Not reachable */
294}
295
296/*
297 * Entrypoint of this module.
298 *
299 * We register more than one worker process here, to demonstrate how that can
300 * be done.
301 */
302void
304{
305 BackgroundWorker worker;
306
307 /* get the configuration */
308
309 /*
310 * These GUCs are defined even if this library is not loaded with
311 * shared_preload_libraries, for worker_spi_launch().
312 */
313 DefineCustomIntVariable("worker_spi.naptime",
314 "Duration between each check (in seconds).",
315 NULL,
317 10,
318 1,
319 INT_MAX,
321 0,
322 NULL,
323 NULL,
324 NULL);
325
326 DefineCustomStringVariable("worker_spi.database",
327 "Database to connect to.",
328 NULL,
330 "postgres",
332 0,
333 NULL, NULL, NULL);
334
335 DefineCustomStringVariable("worker_spi.role",
336 "Role to connect with.",
337 NULL,
339 NULL,
341 0,
342 NULL, NULL, NULL);
343
345 return;
346
347 DefineCustomIntVariable("worker_spi.total_workers",
348 "Number of workers.",
349 NULL,
351 2,
352 1,
353 100,
355 0,
356 NULL,
357 NULL,
358 NULL);
359
360 MarkGUCPrefixReserved("worker_spi");
361
362 /* set up common data for all our workers */
363 memset(&worker, 0, sizeof(worker));
368 sprintf(worker.bgw_library_name, "worker_spi");
369 sprintf(worker.bgw_function_name, "worker_spi_main");
370 worker.bgw_notify_pid = 0;
371
372 /*
373 * Now fill in worker-specific data, and do the actual registrations.
374 *
375 * bgw_extra can optionally include a database OID, a role OID and a set
376 * of flags. This is left empty here to fallback to the related GUCs at
377 * startup (0 for the bgworker flags).
378 */
379 for (int i = 1; i <= worker_spi_total_workers; i++)
380 {
381 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
382 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
383 worker.bgw_main_arg = Int32GetDatum(i);
384
386 }
387}
388
389/*
390 * Dynamically launch an SPI worker.
391 */
392Datum
394{
396 Oid dboid = PG_GETARG_OID(1);
397 Oid roleoid = PG_GETARG_OID(2);
398 BackgroundWorker worker;
400 BgwHandleStatus status;
401 pid_t pid;
402 char *p;
403 bits32 flags = 0;
405 Size ndim;
406 int nelems;
409
410 memset(&worker, 0, sizeof(worker));
413
414 if (interruptible)
416
419 sprintf(worker.bgw_library_name, "worker_spi");
420 sprintf(worker.bgw_function_name, "worker_spi_main");
421 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
422 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
423 worker.bgw_main_arg = Int32GetDatum(i);
424 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
425 worker.bgw_notify_pid = MyProcPid;
426
427 /* extract flags, if any */
428 ndim = ARR_NDIM(arr);
429 if (ndim > 1)
432 errmsg("flags array must be one-dimensional")));
433
434 if (array_contains_nulls(arr))
437 errmsg("flags array must not contain nulls")));
438
439 Assert(ARR_ELEMTYPE(arr) == TEXTOID);
441
442 for (i = 0; i < nelems; i++)
443 {
444 char *optname = TextDatumGetCString(datum_flags[i]);
445
446 if (strcmp(optname, "ALLOWCONN") == 0)
448 else if (strcmp(optname, "ROLELOGINCHECK") == 0)
450 else
453 errmsg("incorrect flag value found in array")));
454 }
455
456 /*
457 * Register database and role to use for the worker started in bgw_extra.
458 * If none have been provided, this will fall back to the GUCs at startup.
459 */
460 if (!OidIsValid(dboid))
462
463 /*
464 * worker_spi_role is NULL by default, so this gives to worker_spi_main()
465 * an invalid OID in this case.
466 */
467 if (!OidIsValid(roleoid) && worker_spi_role)
468 roleoid = get_role_oid(worker_spi_role, false);
469
470 p = worker.bgw_extra;
471 memcpy(p, &dboid, sizeof(Oid));
472 p += sizeof(Oid);
473 memcpy(p, &roleoid, sizeof(Oid));
474 p += sizeof(Oid);
475 memcpy(p, &flags, sizeof(bits32));
476
477 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
479
480 status = WaitForBackgroundWorkerStartup(handle, &pid);
481
482 if (status == BGWH_STOPPED)
485 errmsg("could not start background process"),
486 errhint("More details may be available in the server log.")));
487 if (status == BGWH_POSTMASTER_DIED)
490 errmsg("cannot start background processes without postmaster"),
491 errhint("Kill all remaining database processes and restart the database.")));
492 Assert(status == BGWH_STARTED);
493
494 PG_RETURN_INT32(pid);
495}
Oid get_role_oid(const char *rolname, bool missing_ok)
Definition acl.c:5605
#define ARR_NDIM(a)
Definition array.h:290
#define PG_GETARG_ARRAYTYPE_P(n)
Definition array.h:263
#define ARR_ELEMTYPE(a)
Definition array.h:292
bool array_contains_nulls(const ArrayType *array)
void deconstruct_array_builtin(const ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition bgworker.c:947
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition bgworker.c:1220
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition bgworker.c:860
void BackgroundWorkerUnblockSignals(void)
Definition bgworker.c:934
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition bgworker.c:894
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition bgworker.c:1053
#define BGW_NEVER_RESTART
Definition bgworker.h:92
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition bgworker.h:167
#define BGWORKER_INTERRUPTIBLE
Definition bgworker.h:67
BgwHandleStatus
Definition bgworker.h:111
@ BGWH_POSTMASTER_DIED
Definition bgworker.h:115
@ BGWH_STARTED
Definition bgworker.h:112
@ BGWH_STOPPED
Definition bgworker.h:114
@ BgWorkerStart_RecoveryFinished
Definition bgworker.h:88
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition bgworker.h:60
#define BGWORKER_BYPASS_ALLOWCONN
Definition bgworker.h:166
#define BGWORKER_SHMEM_ACCESS
Definition bgworker.h:53
#define BGW_MAXLEN
Definition bgworker.h:93
#define TextDatumGetCString(d)
Definition builtins.h:99
#define pg_noreturn
Definition c.h:184
#define Assert(condition)
Definition c.h:945
#define PGDLLEXPORT
Definition c.h:1438
uint32 bits32
Definition c.h:627
int32_t int32
Definition c.h:614
uint32_t uint32
Definition c.h:618
#define OidIsValid(objectId)
Definition c.h:860
size_t Size
Definition c.h:691
Oid get_database_oid(const char *dbname, bool missing_ok)
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:31
int errhint(const char *fmt,...) pg_attribute_printf(1
#define FATAL
Definition elog.h:41
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc_object(type)
Definition fe_memutils.h:74
#define PG_GETARG_OID(n)
Definition fmgr.h:275
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_FUNCTION_INFO_V1(funcname)
Definition fmgr.h:417
#define PG_RETURN_INT32(x)
Definition fmgr.h:355
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
int MyProcPid
Definition globals.c:47
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition guc.c:5123
void MarkGUCPrefixReserved(const char *className)
Definition guc.c:5180
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition guc.c:5067
@ PGC_POSTMASTER
Definition guc.h:74
@ PGC_SIGHUP
Definition guc.h:75
long val
Definition informix.c:689
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
char * pstrdup(const char *in)
Definition mcxt.c:1781
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
bool process_shared_preload_libraries_in_progress
Definition miscinit.c:1787
static char * errmsg
static const struct lconv_member_info table[]
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define die(msg)
long pgstat_report_stat(bool force)
Definition pgstat.c:704
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
const char * debug_query_string
Definition postgres.c:91
static int64 DatumGetInt64(Datum X)
Definition postgres.h:403
uint64_t Datum
Definition postgres.h:70
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
unsigned int Oid
BackgroundWorker * MyBgworkerEntry
Definition postmaster.c:200
static int fb(int x)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
uint64 SPI_processed
Definition spi.c:45
SPITupleTable * SPI_tuptable
Definition spi.c:46
int SPI_connect(void)
Definition spi.c:95
int SPI_finish(void)
Definition spi.c:183
int SPI_execute(const char *src, bool read_only, long tcount)
Definition spi.c:597
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition spi.c:1253
#define SPI_OK_UTILITY
Definition spi.h:85
#define SPI_OK_UPDATE_RETURNING
Definition spi.h:94
#define SPI_OK_SELECT
Definition spi.h:86
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
char bgw_function_name[BGW_MAXLEN]
Definition bgworker.h:104
char bgw_name[BGW_MAXLEN]
Definition bgworker.h:98
char bgw_type[BGW_MAXLEN]
Definition bgworker.h:99
BgWorkerStartTime bgw_start_time
Definition bgworker.h:101
char bgw_extra[BGW_EXTRALEN]
Definition bgworker.h:106
pid_t bgw_notify_pid
Definition bgworker.h:107
char bgw_library_name[MAXPGPATH]
Definition bgworker.h:103
TupleDesc tupdesc
Definition spi.h:25
HeapTuple * vals
Definition spi.h:26
Definition type.h:96
const char * name
Definition worker_spi.c:62
const char * schema
Definition worker_spi.c:61
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition wait_event.c:163
const char * name
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
static int worker_spi_naptime
Definition worker_spi.c:51
static void initialize_worker_spi(worktable *table)
Definition worker_spi.c:70
void _PG_init(void)
Definition worker_spi.c:303
PG_MODULE_MAGIC
Definition worker_spi.c:44
Datum worker_spi_launch(PG_FUNCTION_ARGS)
Definition worker_spi.c:393
static uint32 worker_spi_wait_event_main
Definition worker_spi.c:57
static char * worker_spi_database
Definition worker_spi.c:53
static int worker_spi_total_workers
Definition worker_spi.c:52
PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg)
Definition worker_spi.c:135
static char * worker_spi_role
Definition worker_spi.c:54
void StartTransactionCommand(void)
Definition xact.c:3081
void SetCurrentStatementStartTimestamp(void)
Definition xact.c:916
void CommitTransactionCommand(void)
Definition xact.c:3179