PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
worker_spi.c
Go to the documentation of this file.
1/* -------------------------------------------------------------------------
2 *
3 * worker_spi.c
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
9 *
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
15 *
16 * Copyright (c) 2013-2024, PostgreSQL Global Development Group
17 *
18 * IDENTIFICATION
19 * src/test/modules/worker_spi/worker_spi.c
20 *
21 * -------------------------------------------------------------------------
22 */
23#include "postgres.h"
24
25/* These are always necessary for a bgworker */
26#include "miscadmin.h"
27#include "postmaster/bgworker.h"
29#include "storage/latch.h"
30
31/* these headers are used by this particular worker's code */
32#include "access/xact.h"
33#include "commands/dbcommands.h"
34#include "executor/spi.h"
35#include "fmgr.h"
36#include "lib/stringinfo.h"
37#include "pgstat.h"
38#include "tcop/utility.h"
39#include "utils/acl.h"
40#include "utils/builtins.h"
41#include "utils/snapmgr.h"
42
44
46
48
49/* GUC variables */
50static int worker_spi_naptime = 10;
52static char *worker_spi_database = NULL;
53static char *worker_spi_role = NULL;
54
55/* value cached, fetched from shared memory */
57
58typedef struct worktable
59{
60 const char *schema;
61 const char *name;
63
64/*
65 * Initialize workspace for a worker process: create the schema if it doesn't
66 * already exist.
67 */
68static void
70{
71 int ret;
72 int ntup;
73 bool isnull;
75
80 pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
81
82 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
84 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
85 table->schema);
86
88 ret = SPI_execute(buf.data, true, 0);
89 if (ret != SPI_OK_SELECT)
90 elog(FATAL, "SPI_execute failed: error code %d", ret);
91
92 if (SPI_processed != 1)
93 elog(FATAL, "not a singleton result");
94
97 1, &isnull));
98 if (isnull)
99 elog(FATAL, "null result");
100
101 if (ntup == 0)
102 {
103 debug_query_string = NULL;
106 "CREATE SCHEMA \"%s\" "
107 "CREATE TABLE \"%s\" ("
108 " type text CHECK (type IN ('total', 'delta')), "
109 " value integer)"
110 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
111 "WHERE type = 'total'",
112 table->schema, table->name, table->name, table->name);
113
114 /* set statement start time */
116
117 debug_query_string = buf.data;
118 ret = SPI_execute(buf.data, false, 0);
119
120 if (ret != SPI_OK_UTILITY)
121 elog(FATAL, "failed to create my schema");
122
123 debug_query_string = NULL; /* rest is not statement-specific */
124 }
125
126 SPI_finish();
129 debug_query_string = NULL;
131}
132
133void
135{
136 int index = DatumGetInt32(main_arg);
137 worktable *table;
139 char name[20];
140 Oid dboid;
141 Oid roleoid;
142 char *p;
143 bits32 flags = 0;
144
145 table = palloc(sizeof(worktable));
146 sprintf(name, "schema%d", index);
147 table->schema = pstrdup(name);
148 table->name = pstrdup("counted");
149
150 /* fetch database and role OIDs, these are set for a dynamic worker */
152 memcpy(&dboid, p, sizeof(Oid));
153 p += sizeof(Oid);
154 memcpy(&roleoid, p, sizeof(Oid));
155 p += sizeof(Oid);
156 memcpy(&flags, p, sizeof(bits32));
157
158 /* Establish signal handlers before unblocking signals. */
160 pqsignal(SIGTERM, die);
161
162 /* We're now ready to receive signals */
164
165 /* Connect to our database */
166 if (OidIsValid(dboid))
167 BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
168 else
170 worker_spi_role, flags);
171
172 elog(LOG, "%s initialized with %s.%s",
173 MyBgworkerEntry->bgw_name, table->schema, table->name);
175
176 /*
177 * Quote identifiers passed to us. Note that this must be done after
178 * initialize_worker_spi, because that routine assumes the names are not
179 * quoted.
180 *
181 * Note some memory might be leaked here.
182 */
183 table->schema = quote_identifier(table->schema);
184 table->name = quote_identifier(table->name);
185
188 "WITH deleted AS (DELETE "
189 "FROM %s.%s "
190 "WHERE type = 'delta' RETURNING value), "
191 "total AS (SELECT coalesce(sum(value), 0) as sum "
192 "FROM deleted) "
193 "UPDATE %s.%s "
194 "SET value = %s.value + total.sum "
195 "FROM total WHERE type = 'total' "
196 "RETURNING %s.value",
197 table->schema, table->name,
198 table->schema, table->name,
199 table->name,
200 table->name);
201
202 /*
203 * Main loop: do this until SIGTERM is received and processed by
204 * ProcessInterrupts.
205 */
206 for (;;)
207 {
208 int ret;
209
210 /* First time, allocate or get the custom wait event */
213
214 /*
215 * Background workers mustn't call usleep() or any direct equivalent:
216 * instead, they may wait on their process latch, which sleeps as
217 * necessary, but is awakened if postmaster dies. That way the
218 * background process goes away immediately in an emergency.
219 */
220 (void) WaitLatch(MyLatch,
222 worker_spi_naptime * 1000L,
225
227
228 /*
229 * In case of a SIGHUP, just reload the configuration.
230 */
232 {
233 ConfigReloadPending = false;
235 }
236
237 /*
238 * Start a transaction on which we can run queries. Note that each
239 * StartTransactionCommand() call should be preceded by a
240 * SetCurrentStatementStartTimestamp() call, which sets both the time
241 * for the statement we're about the run, and also the transaction
242 * start time. Also, each other query sent to SPI should probably be
243 * preceded by SetCurrentStatementStartTimestamp(), so that statement
244 * start time is always up to date.
245 *
246 * The SPI_connect() call lets us run queries through the SPI manager,
247 * and the PushActiveSnapshot() call creates an "active" snapshot
248 * which is necessary for queries to have MVCC data to work on.
249 *
250 * The pgstat_report_activity() call makes our activity visible
251 * through the pgstat views.
252 */
255 SPI_connect();
257 debug_query_string = buf.data;
259
260 /* We can now execute queries via SPI */
261 ret = SPI_execute(buf.data, false, 0);
262
263 if (ret != SPI_OK_UPDATE_RETURNING)
264 elog(FATAL, "cannot select from table %s.%s: error code %d",
265 table->schema, table->name, ret);
266
267 if (SPI_processed > 0)
268 {
269 bool isnull;
270 int32 val;
271
274 1, &isnull));
275 if (!isnull)
276 elog(LOG, "%s: count in %s.%s is now %d",
278 table->schema, table->name, val);
279 }
280
281 /*
282 * And finish our transaction.
283 */
284 SPI_finish();
287 debug_query_string = NULL;
288 pgstat_report_stat(true);
290 }
291
292 /* Not reachable */
293}
294
295/*
296 * Entrypoint of this module.
297 *
298 * We register more than one worker process here, to demonstrate how that can
299 * be done.
300 */
301void
303{
304 BackgroundWorker worker;
305
306 /* get the configuration */
307
308 /*
309 * These GUCs are defined even if this library is not loaded with
310 * shared_preload_libraries, for worker_spi_launch().
311 */
312 DefineCustomIntVariable("worker_spi.naptime",
313 "Duration between each check (in seconds).",
314 NULL,
316 10,
317 1,
318 INT_MAX,
320 0,
321 NULL,
322 NULL,
323 NULL);
324
325 DefineCustomStringVariable("worker_spi.database",
326 "Database to connect to.",
327 NULL,
329 "postgres",
331 0,
332 NULL, NULL, NULL);
333
334 DefineCustomStringVariable("worker_spi.role",
335 "Role to connect with.",
336 NULL,
338 NULL,
340 0,
341 NULL, NULL, NULL);
342
344 return;
345
346 DefineCustomIntVariable("worker_spi.total_workers",
347 "Number of workers.",
348 NULL,
350 2,
351 1,
352 100,
354 0,
355 NULL,
356 NULL,
357 NULL);
358
359 MarkGUCPrefixReserved("worker_spi");
360
361 /* set up common data for all our workers */
362 memset(&worker, 0, sizeof(worker));
367 sprintf(worker.bgw_library_name, "worker_spi");
368 sprintf(worker.bgw_function_name, "worker_spi_main");
369 worker.bgw_notify_pid = 0;
370
371 /*
372 * Now fill in worker-specific data, and do the actual registrations.
373 *
374 * bgw_extra can optionally include a database OID, a role OID and a set
375 * of flags. This is left empty here to fallback to the related GUCs at
376 * startup (0 for the bgworker flags).
377 */
378 for (int i = 1; i <= worker_spi_total_workers; i++)
379 {
380 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
381 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
382 worker.bgw_main_arg = Int32GetDatum(i);
383
385 }
386}
387
388/*
389 * Dynamically launch an SPI worker.
390 */
391Datum
393{
395 Oid dboid = PG_GETARG_OID(1);
396 Oid roleoid = PG_GETARG_OID(2);
397 BackgroundWorker worker;
399 BgwHandleStatus status;
400 pid_t pid;
401 char *p;
402 bits32 flags = 0;
404 Size ndim;
405 int nelems;
406 Datum *datum_flags;
407
408 memset(&worker, 0, sizeof(worker));
413 sprintf(worker.bgw_library_name, "worker_spi");
414 sprintf(worker.bgw_function_name, "worker_spi_main");
415 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
416 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
417 worker.bgw_main_arg = Int32GetDatum(i);
418 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
419 worker.bgw_notify_pid = MyProcPid;
420
421 /* extract flags, if any */
422 ndim = ARR_NDIM(arr);
423 if (ndim > 1)
425 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
426 errmsg("flags array must be one-dimensional")));
427
428 if (array_contains_nulls(arr))
430 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431 errmsg("flags array must not contain nulls")));
432
433 Assert(ARR_ELEMTYPE(arr) == TEXTOID);
434 deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
435
436 for (i = 0; i < nelems; i++)
437 {
438 char *optname = TextDatumGetCString(datum_flags[i]);
439
440 if (strcmp(optname, "ALLOWCONN") == 0)
442 else if (strcmp(optname, "ROLELOGINCHECK") == 0)
444 else
446 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
447 errmsg("incorrect flag value found in array")));
448 }
449
450 /*
451 * Register database and role to use for the worker started in bgw_extra.
452 * If none have been provided, this will fall back to the GUCs at startup.
453 */
454 if (!OidIsValid(dboid))
456
457 /*
458 * worker_spi_role is NULL by default, so this gives to worker_spi_main()
459 * an invalid OID in this case.
460 */
461 if (!OidIsValid(roleoid) && worker_spi_role)
462 roleoid = get_role_oid(worker_spi_role, false);
463
464 p = worker.bgw_extra;
465 memcpy(p, &dboid, sizeof(Oid));
466 p += sizeof(Oid);
467 memcpy(p, &roleoid, sizeof(Oid));
468 p += sizeof(Oid);
469 memcpy(p, &flags, sizeof(bits32));
470
471 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
473
474 status = WaitForBackgroundWorkerStartup(handle, &pid);
475
476 if (status == BGWH_STOPPED)
478 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
479 errmsg("could not start background process"),
480 errhint("More details may be available in the server log.")));
481 if (status == BGWH_POSTMASTER_DIED)
483 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
484 errmsg("cannot start background processes without postmaster"),
485 errhint("Kill all remaining database processes and restart the database.")));
486 Assert(status == BGWH_STARTED);
487
488 PG_RETURN_INT32(pid);
489}
Oid get_role_oid(const char *rolname, bool missing_ok)
Definition: acl.c:5554
#define ARR_NDIM(a)
Definition: array.h:290
#define PG_GETARG_ARRAYTYPE_P(n)
Definition: array.h:263
#define ARR_ELEMTYPE(a)
Definition: array.h:292
bool array_contains_nulls(ArrayType *array)
Definition: arrayfuncs.c:3767
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3697
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:939
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1212
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: bgworker.c:852
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:926
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
Definition: bgworker.c:886
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:1045
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
#define BGWORKER_BYPASS_ROLELOGINCHECK
Definition: bgworker.h:157
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_POSTMASTER_DIED
Definition: bgworker.h:108
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_STOPPED
Definition: bgworker.h:107
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_BYPASS_ALLOWCONN
Definition: bgworker.h:156
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define Assert(condition)
Definition: c.h:812
#define pg_attribute_noreturn()
Definition: c.h:236
#define PGDLLEXPORT
Definition: c.h:1289
uint32 bits32
Definition: c.h:494
int32_t int32
Definition: c.h:481
uint32_t uint32
Definition: c.h:485
#define OidIsValid(objectId)
Definition: c.h:729
size_t Size
Definition: c.h:559
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3140
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_INT32(x)
Definition: fmgr.h:354
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
int MyProcPid
Definition: globals.c:46
struct Latch * MyLatch
Definition: globals.c:62
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:5218
void MarkGUCPrefixReserved(const char *className)
Definition: guc.c:5279
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:5158
@ PGC_POSTMASTER
Definition: guc.h:70
@ PGC_SIGHUP
Definition: guc.h:71
long val
Definition: informix.c:689
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
int i
Definition: isn.c:72
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1834
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:692
#define sprintf
Definition: port.h:240
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
const char * debug_query_string
Definition: postgres.c:87
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
unsigned int Oid
Definition: postgres_ext.h:31
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:192
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12870
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:212
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:610
void PopActiveSnapshot(void)
Definition: snapmgr.c:703
uint64 SPI_processed
Definition: spi.c:44
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_connect(void)
Definition: spi.c:94
int SPI_finish(void)
Definition: spi.c:182
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:596
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:1252
#define SPI_OK_UTILITY
Definition: spi.h:85
#define SPI_OK_UPDATE_RETURNING
Definition: spi.h:94
#define SPI_OK_SELECT
Definition: spi.h:86
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:94
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: type.h:96
const char * name
Definition: worker_spi.c:61
const char * schema
Definition: worker_spi.c:60
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:163
const char * name
#define SIGHUP
Definition: win32_port.h:168
static int worker_spi_naptime
Definition: worker_spi.c:50
static void initialize_worker_spi(worktable *table)
Definition: worker_spi.c:69
void _PG_init(void)
Definition: worker_spi.c:302
PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn()
Definition: worker_spi.c:134
PG_MODULE_MAGIC
Definition: worker_spi.c:43
Datum worker_spi_launch(PG_FUNCTION_ARGS)
Definition: worker_spi.c:392
static uint32 worker_spi_wait_event_main
Definition: worker_spi.c:56
static char * worker_spi_database
Definition: worker_spi.c:52
static int worker_spi_total_workers
Definition: worker_spi.c:51
struct worktable worktable
static char * worker_spi_role
Definition: worker_spi.c:53
PG_FUNCTION_INFO_V1(worker_spi_launch)
void StartTransactionCommand(void)
Definition: xact.c:3051
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:913
void CommitTransactionCommand(void)
Definition: xact.c:3149