PostgreSQL Source Code  git master
worker_spi.c
Go to the documentation of this file.
1 /* -------------------------------------------------------------------------
2  *
3  * worker_spi.c
4  * Sample background worker code that demonstrates various coding
5  * patterns: establishing a database connection; starting and committing
6  * transactions; using GUC variables, and heeding SIGHUP to reread
7  * the configuration file; reporting to pg_stat_activity; using the
8  * process latch to sleep and exit in case of postmaster death.
9  *
10  * This code connects to a database, creates a schema and table, and summarizes
11  * the numbers contained therein. To see it working, insert an initial value
12  * with "total" type and some initial value; then insert some other rows with
13  * "delta" type. Delta rows will be deleted by this worker and their values
14  * aggregated into the total.
15  *
16  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
17  *
18  * IDENTIFICATION
19  * src/test/modules/worker_spi/worker_spi.c
20  *
21  * -------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24 
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "postmaster/interrupt.h"
29 #include "storage/ipc.h"
30 #include "storage/latch.h"
31 #include "storage/lwlock.h"
32 #include "storage/proc.h"
33 #include "storage/shmem.h"
34 
35 /* these headers are used by this particular worker's code */
36 #include "access/xact.h"
37 #include "executor/spi.h"
38 #include "fmgr.h"
39 #include "lib/stringinfo.h"
40 #include "pgstat.h"
41 #include "utils/builtins.h"
42 #include "utils/snapmgr.h"
43 #include "tcop/utility.h"
44 
46 
48 
50 
51 /* GUC variables */
52 static int worker_spi_naptime = 10;
53 static int worker_spi_total_workers = 2;
54 static char *worker_spi_database = NULL;
55 
56 /* value cached, fetched from shared memory */
58 
59 typedef struct worktable
60 {
61  const char *schema;
62  const char *name;
64 
65 /*
66  * Initialize workspace for a worker process: create the schema if it doesn't
67  * already exist.
68  */
69 static void
71 {
72  int ret;
73  int ntup;
74  bool isnull;
76 
79  SPI_connect();
81  pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
82 
83  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
85  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
86  table->schema);
87 
88  debug_query_string = buf.data;
89  ret = SPI_execute(buf.data, true, 0);
90  if (ret != SPI_OK_SELECT)
91  elog(FATAL, "SPI_execute failed: error code %d", ret);
92 
93  if (SPI_processed != 1)
94  elog(FATAL, "not a singleton result");
95 
98  1, &isnull));
99  if (isnull)
100  elog(FATAL, "null result");
101 
102  if (ntup == 0)
103  {
104  debug_query_string = NULL;
107  "CREATE SCHEMA \"%s\" "
108  "CREATE TABLE \"%s\" ("
109  " type text CHECK (type IN ('total', 'delta')), "
110  " value integer)"
111  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
112  "WHERE type = 'total'",
113  table->schema, table->name, table->name, table->name);
114 
115  /* set statement start time */
117 
118  debug_query_string = buf.data;
119  ret = SPI_execute(buf.data, false, 0);
120 
121  if (ret != SPI_OK_UTILITY)
122  elog(FATAL, "failed to create my schema");
123 
124  debug_query_string = NULL; /* rest is not statement-specific */
125  }
126 
127  SPI_finish();
130  debug_query_string = NULL;
132 }
133 
134 void
136 {
137  int index = DatumGetInt32(main_arg);
138  worktable *table;
140  char name[20];
141 
142  table = palloc(sizeof(worktable));
143  sprintf(name, "schema%d", index);
144  table->schema = pstrdup(name);
145  table->name = pstrdup("counted");
146 
147  /* Establish signal handlers before unblocking signals. */
149  pqsignal(SIGTERM, die);
150 
151  /* We're now ready to receive signals */
153 
154  /* Connect to our database */
156 
157  elog(LOG, "%s initialized with %s.%s",
158  MyBgworkerEntry->bgw_name, table->schema, table->name);
159  initialize_worker_spi(table);
160 
161  /*
162  * Quote identifiers passed to us. Note that this must be done after
163  * initialize_worker_spi, because that routine assumes the names are not
164  * quoted.
165  *
166  * Note some memory might be leaked here.
167  */
168  table->schema = quote_identifier(table->schema);
169  table->name = quote_identifier(table->name);
170 
173  "WITH deleted AS (DELETE "
174  "FROM %s.%s "
175  "WHERE type = 'delta' RETURNING value), "
176  "total AS (SELECT coalesce(sum(value), 0) as sum "
177  "FROM deleted) "
178  "UPDATE %s.%s "
179  "SET value = %s.value + total.sum "
180  "FROM total WHERE type = 'total' "
181  "RETURNING %s.value",
182  table->schema, table->name,
183  table->schema, table->name,
184  table->name,
185  table->name);
186 
187  /*
188  * Main loop: do this until SIGTERM is received and processed by
189  * ProcessInterrupts.
190  */
191  for (;;)
192  {
193  int ret;
194 
195  /* First time, allocate or get the custom wait event */
198 
199  /*
200  * Background workers mustn't call usleep() or any direct equivalent:
201  * instead, they may wait on their process latch, which sleeps as
202  * necessary, but is awakened if postmaster dies. That way the
203  * background process goes away immediately in an emergency.
204  */
205  (void) WaitLatch(MyLatch,
207  worker_spi_naptime * 1000L,
210 
212 
213  /*
214  * In case of a SIGHUP, just reload the configuration.
215  */
217  {
218  ConfigReloadPending = false;
220  }
221 
222  /*
223  * Start a transaction on which we can run queries. Note that each
224  * StartTransactionCommand() call should be preceded by a
225  * SetCurrentStatementStartTimestamp() call, which sets both the time
226  * for the statement we're about the run, and also the transaction
227  * start time. Also, each other query sent to SPI should probably be
228  * preceded by SetCurrentStatementStartTimestamp(), so that statement
229  * start time is always up to date.
230  *
231  * The SPI_connect() call lets us run queries through the SPI manager,
232  * and the PushActiveSnapshot() call creates an "active" snapshot
233  * which is necessary for queries to have MVCC data to work on.
234  *
235  * The pgstat_report_activity() call makes our activity visible
236  * through the pgstat views.
237  */
240  SPI_connect();
242  debug_query_string = buf.data;
244 
245  /* We can now execute queries via SPI */
246  ret = SPI_execute(buf.data, false, 0);
247 
248  if (ret != SPI_OK_UPDATE_RETURNING)
249  elog(FATAL, "cannot select from table %s.%s: error code %d",
250  table->schema, table->name, ret);
251 
252  if (SPI_processed > 0)
253  {
254  bool isnull;
255  int32 val;
256 
259  1, &isnull));
260  if (!isnull)
261  elog(LOG, "%s: count in %s.%s is now %d",
263  table->schema, table->name, val);
264  }
265 
266  /*
267  * And finish our transaction.
268  */
269  SPI_finish();
272  debug_query_string = NULL;
273  pgstat_report_stat(true);
275  }
276 
277  /* Not reachable */
278 }
279 
280 /*
281  * Entrypoint of this module.
282  *
283  * We register more than one worker process here, to demonstrate how that can
284  * be done.
285  */
286 void
287 _PG_init(void)
288 {
289  BackgroundWorker worker;
290 
291  /* get the configuration */
292 
293  /*
294  * These GUCs are defined even if this library is not loaded with
295  * shared_preload_libraries, for worker_spi_launch().
296  */
297  DefineCustomIntVariable("worker_spi.naptime",
298  "Duration between each check (in seconds).",
299  NULL,
301  10,
302  1,
303  INT_MAX,
304  PGC_SIGHUP,
305  0,
306  NULL,
307  NULL,
308  NULL);
309 
310  DefineCustomStringVariable("worker_spi.database",
311  "Database to connect to.",
312  NULL,
314  "postgres",
315  PGC_SIGHUP,
316  0,
317  NULL, NULL, NULL);
318 
320  return;
321 
322  DefineCustomIntVariable("worker_spi.total_workers",
323  "Number of workers.",
324  NULL,
326  2,
327  1,
328  100,
330  0,
331  NULL,
332  NULL,
333  NULL);
334 
335  MarkGUCPrefixReserved("worker_spi");
336 
337  /* set up common data for all our workers */
338  memset(&worker, 0, sizeof(worker));
343  sprintf(worker.bgw_library_name, "worker_spi");
344  sprintf(worker.bgw_function_name, "worker_spi_main");
345  worker.bgw_notify_pid = 0;
346 
347  /*
348  * Now fill in worker-specific data, and do the actual registrations.
349  */
350  for (int i = 1; i <= worker_spi_total_workers; i++)
351  {
352  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
353  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
354  worker.bgw_main_arg = Int32GetDatum(i);
355 
356  RegisterBackgroundWorker(&worker);
357  }
358 }
359 
360 /*
361  * Dynamically launch an SPI worker.
362  */
363 Datum
365 {
366  int32 i = PG_GETARG_INT32(0);
367  BackgroundWorker worker;
368  BackgroundWorkerHandle *handle;
369  BgwHandleStatus status;
370  pid_t pid;
371 
372  memset(&worker, 0, sizeof(worker));
377  sprintf(worker.bgw_library_name, "worker_spi");
378  sprintf(worker.bgw_function_name, "worker_spi_main");
379  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
380  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
381  worker.bgw_main_arg = Int32GetDatum(i);
382  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
383  worker.bgw_notify_pid = MyProcPid;
384 
385  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
386  PG_RETURN_NULL();
387 
388  status = WaitForBackgroundWorkerStartup(handle, &pid);
389 
390  if (status == BGWH_STOPPED)
391  ereport(ERROR,
392  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
393  errmsg("could not start background process"),
394  errhint("More details may be available in the server log.")));
395  if (status == BGWH_POSTMASTER_DIED)
396  ereport(ERROR,
397  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
398  errmsg("cannot start background processes without postmaster"),
399  errhint("Kill all remaining database processes and restart the database.")));
400  Assert(status == BGWH_STARTED);
401 
402  PG_RETURN_INT32(pid);
403 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:878
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1129
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:962
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_POSTMASTER_DIED
Definition: bgworker.h:108
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_STOPPED
Definition: bgworker.h:107
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
unsigned int uint32
Definition: c.h:495
signed int int32
Definition: c.h:483
#define pg_attribute_noreturn()
Definition: c.h:206
#define PGDLLEXPORT
Definition: c.h:1341
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_INT32(x)
Definition: fmgr.h:354
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
int MyProcPid
Definition: globals.c:44
struct Latch * MyLatch
Definition: globals.c:58
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:5044
void MarkGUCPrefixReserved(const char *className)
Definition: guc.c:5105
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:4984
@ PGC_POSTMASTER
Definition: guc.h:70
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
long val
Definition: informix.c:664
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
int i
Definition: isn.c:73
void ResetLatch(Latch *latch)
Definition: latch.c:697
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:490
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
Assert(fmt[strlen(fmt) - 1] !='\n')
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void * palloc(Size size)
Definition: mcxt.c:1226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1764
#define die(msg)
Definition: pg_test_fsync.c:95
static char * buf
Definition: pg_test_fsync.c:67
long pgstat_report_stat(bool force)
Definition: pgstat.c:582
#define sprintf
Definition: port.h:240
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
const char * debug_query_string
Definition: postgres.c:85
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: postmaster.c:5578
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5638
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:194
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11965
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:197
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:629
void PopActiveSnapshot(void)
Definition: snapmgr.c:724
uint64 SPI_processed
Definition: spi.c:45
SPITupleTable * SPI_tuptable
Definition: spi.c:46
int SPI_connect(void)
Definition: spi.c:95
int SPI_finish(void)
Definition: spi.c:183
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:594
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:1250
#define SPI_OK_UTILITY
Definition: spi.h:85
#define SPI_OK_UPDATE_RETURNING
Definition: spi.h:94
#define SPI_OK_SELECT
Definition: spi.h:86
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: type.h:95
const char * name
Definition: worker_spi.c:62
const char * schema
Definition: worker_spi.c:61
uint32 WaitEventExtensionNew(const char *wait_event_name)
Definition: wait_event.c:164
const char * name
#define SIGHUP
Definition: win32_port.h:168
static int worker_spi_naptime
Definition: worker_spi.c:52
static void initialize_worker_spi(worktable *table)
Definition: worker_spi.c:70
void _PG_init(void)
Definition: worker_spi.c:287
PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn()
Definition: worker_spi.c:135
PG_MODULE_MAGIC
Definition: worker_spi.c:45
Datum worker_spi_launch(PG_FUNCTION_ARGS)
Definition: worker_spi.c:364
static uint32 worker_spi_wait_event_main
Definition: worker_spi.c:57
static char * worker_spi_database
Definition: worker_spi.c:54
static int worker_spi_total_workers
Definition: worker_spi.c:53
struct worktable worktable
PG_FUNCTION_INFO_V1(worker_spi_launch)
void StartTransactionCommand(void)
Definition: xact.c:2937
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:899
void CommitTransactionCommand(void)
Definition: xact.c:3034