PostgreSQL Source Code  git master
worker_spi.c
Go to the documentation of this file.
1 /* -------------------------------------------------------------------------
2  *
3  * worker_spi.c
4  * Sample background worker code that demonstrates various coding
5  * patterns: establishing a database connection; starting and committing
6  * transactions; using GUC variables, and heeding SIGHUP to reread
7  * the configuration file; reporting to pg_stat_activity; using the
8  * process latch to sleep and exit in case of postmaster death.
9  *
10  * This code connects to a database, creates a schema and table, and summarizes
11  * the numbers contained therein. To see it working, insert an initial value
12  * with "total" type and some initial value; then insert some other rows with
13  * "delta" type. Delta rows will be deleted by this worker and their values
14  * aggregated into the total.
15  *
16  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
17  *
18  * IDENTIFICATION
19  * src/test/modules/worker_spi/worker_spi.c
20  *
21  * -------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24 
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33 
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43 
45 
47 
48 void _PG_init(void);
50 
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54 
55 /* GUC variables */
56 static int worker_spi_naptime = 10;
57 static int worker_spi_total_workers = 2;
58 static char *worker_spi_database = NULL;
59 
60 
61 typedef struct worktable
62 {
63  const char *schema;
64  const char *name;
66 
67 /*
68  * Signal handler for SIGTERM
69  * Set a flag to let the main loop to terminate, and set our latch to wake
70  * it up.
71  */
72 static void
74 {
75  int save_errno = errno;
76 
77  got_sigterm = true;
79 
80  errno = save_errno;
81 }
82 
83 /*
84  * Signal handler for SIGHUP
85  * Set a flag to tell the main loop to reread the config file, and set
86  * our latch to wake it up.
87  */
88 static void
90 {
91  int save_errno = errno;
92 
93  got_sighup = true;
95 
96  errno = save_errno;
97 }
98 
99 /*
100  * Initialize workspace for a worker process: create the schema if it doesn't
101  * already exist.
102  */
103 static void
105 {
106  int ret;
107  int ntup;
108  bool isnull;
110 
113  SPI_connect();
115  pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
116 
117  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
118  initStringInfo(&buf);
119  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
120  table->schema);
121 
122  ret = SPI_execute(buf.data, true, 0);
123  if (ret != SPI_OK_SELECT)
124  elog(FATAL, "SPI_execute failed: error code %d", ret);
125 
126  if (SPI_processed != 1)
127  elog(FATAL, "not a singleton result");
128 
131  1, &isnull));
132  if (isnull)
133  elog(FATAL, "null result");
134 
135  if (ntup == 0)
136  {
137  resetStringInfo(&buf);
138  appendStringInfo(&buf,
139  "CREATE SCHEMA \"%s\" "
140  "CREATE TABLE \"%s\" ("
141  " type text CHECK (type IN ('total', 'delta')), "
142  " value integer)"
143  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
144  "WHERE type = 'total'",
145  table->schema, table->name, table->name, table->name);
146 
147  /* set statement start time */
149 
150  ret = SPI_execute(buf.data, false, 0);
151 
152  if (ret != SPI_OK_UTILITY)
153  elog(FATAL, "failed to create my schema");
154  }
155 
156  SPI_finish();
160 }
161 
162 void
163 worker_spi_main(Datum main_arg)
164 {
165  int index = DatumGetInt32(main_arg);
166  worktable *table;
168  char name[20];
169 
170  table = palloc(sizeof(worktable));
171  sprintf(name, "schema%d", index);
172  table->schema = pstrdup(name);
173  table->name = pstrdup("counted");
174 
175  /* Establish signal handlers before unblocking signals. */
177  pqsignal(SIGTERM, worker_spi_sigterm);
178 
179  /* We're now ready to receive signals */
181 
182  /* Connect to our database */
183  BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
184 
185  elog(LOG, "%s initialized with %s.%s",
186  MyBgworkerEntry->bgw_name, table->schema, table->name);
187  initialize_worker_spi(table);
188 
189  /*
190  * Quote identifiers passed to us. Note that this must be done after
191  * initialize_worker_spi, because that routine assumes the names are not
192  * quoted.
193  *
194  * Note some memory might be leaked here.
195  */
196  table->schema = quote_identifier(table->schema);
197  table->name = quote_identifier(table->name);
198 
199  initStringInfo(&buf);
200  appendStringInfo(&buf,
201  "WITH deleted AS (DELETE "
202  "FROM %s.%s "
203  "WHERE type = 'delta' RETURNING value), "
204  "total AS (SELECT coalesce(sum(value), 0) as sum "
205  "FROM deleted) "
206  "UPDATE %s.%s "
207  "SET value = %s.value + total.sum "
208  "FROM total WHERE type = 'total' "
209  "RETURNING %s.value",
210  table->schema, table->name,
211  table->schema, table->name,
212  table->name,
213  table->name);
214 
215  /*
216  * Main loop: do this until the SIGTERM handler tells us to terminate
217  */
218  while (!got_sigterm)
219  {
220  int ret;
221 
222  /*
223  * Background workers mustn't call usleep() or any direct equivalent:
224  * instead, they may wait on their process latch, which sleeps as
225  * necessary, but is awakened if postmaster dies. That way the
226  * background process goes away immediately in an emergency.
227  */
228  (void) WaitLatch(MyLatch,
230  worker_spi_naptime * 1000L,
233 
235 
236  /*
237  * In case of a SIGHUP, just reload the configuration.
238  */
239  if (got_sighup)
240  {
241  got_sighup = false;
243  }
244 
245  /*
246  * Start a transaction on which we can run queries. Note that each
247  * StartTransactionCommand() call should be preceded by a
248  * SetCurrentStatementStartTimestamp() call, which sets both the time
249  * for the statement we're about the run, and also the transaction
250  * start time. Also, each other query sent to SPI should probably be
251  * preceded by SetCurrentStatementStartTimestamp(), so that statement
252  * start time is always up to date.
253  *
254  * The SPI_connect() call lets us run queries through the SPI manager,
255  * and the PushActiveSnapshot() call creates an "active" snapshot
256  * which is necessary for queries to have MVCC data to work on.
257  *
258  * The pgstat_report_activity() call makes our activity visible
259  * through the pgstat views.
260  */
263  SPI_connect();
266 
267  /* We can now execute queries via SPI */
268  ret = SPI_execute(buf.data, false, 0);
269 
270  if (ret != SPI_OK_UPDATE_RETURNING)
271  elog(FATAL, "cannot select from table %s.%s: error code %d",
272  table->schema, table->name, ret);
273 
274  if (SPI_processed > 0)
275  {
276  bool isnull;
277  int32 val;
278 
281  1, &isnull));
282  if (!isnull)
283  elog(LOG, "%s: count in %s.%s is now %d",
285  table->schema, table->name, val);
286  }
287 
288  /*
289  * And finish our transaction.
290  */
291  SPI_finish();
294  pgstat_report_stat(false);
296  }
297 
298  proc_exit(1);
299 }
300 
301 /*
302  * Entrypoint of this module.
303  *
304  * We register more than one worker process here, to demonstrate how that can
305  * be done.
306  */
307 void
308 _PG_init(void)
309 {
310  BackgroundWorker worker;
311  unsigned int i;
312 
313  /* get the configuration */
314  DefineCustomIntVariable("worker_spi.naptime",
315  "Duration between each check (in seconds).",
316  NULL,
317  &worker_spi_naptime,
318  10,
319  1,
320  INT_MAX,
321  PGC_SIGHUP,
322  0,
323  NULL,
324  NULL,
325  NULL);
326 
328  return;
329 
330  DefineCustomIntVariable("worker_spi.total_workers",
331  "Number of workers.",
332  NULL,
333  &worker_spi_total_workers,
334  2,
335  1,
336  100,
338  0,
339  NULL,
340  NULL,
341  NULL);
342 
343  DefineCustomStringVariable("worker_spi.database",
344  "Database to connect to.",
345  NULL,
346  &worker_spi_database,
347  "postgres",
349  0,
350  NULL, NULL, NULL);
351 
352  /* set up common data for all our workers */
353  memset(&worker, 0, sizeof(worker));
358  sprintf(worker.bgw_library_name, "worker_spi");
359  sprintf(worker.bgw_function_name, "worker_spi_main");
360  worker.bgw_notify_pid = 0;
361 
362  /*
363  * Now fill in worker-specific data, and do the actual registrations.
364  */
365  for (i = 1; i <= worker_spi_total_workers; i++)
366  {
367  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
368  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
369  worker.bgw_main_arg = Int32GetDatum(i);
370 
371  RegisterBackgroundWorker(&worker);
372  }
373 }
374 
375 /*
376  * Dynamically launch an SPI worker.
377  */
378 Datum
380 {
381  int32 i = PG_GETARG_INT32(0);
382  BackgroundWorker worker;
383  BackgroundWorkerHandle *handle;
385  pid_t pid;
386 
387  memset(&worker, 0, sizeof(worker));
392  sprintf(worker.bgw_library_name, "worker_spi");
393  sprintf(worker.bgw_function_name, "worker_spi_main");
394  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
395  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
396  worker.bgw_main_arg = Int32GetDatum(i);
397  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
398  worker.bgw_notify_pid = MyProcPid;
399 
400  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
401  PG_RETURN_NULL();
402 
403  status = WaitForBackgroundWorkerStartup(handle, &pid);
404 
405  if (status == BGWH_STOPPED)
406  ereport(ERROR,
407  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
408  errmsg("could not start background process"),
409  errhint("More details may be available in the server log.")));
410  if (status == BGWH_POSTMASTER_DIED)
411  ereport(ERROR,
412  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
413  errmsg("cannot start background processes without postmaster"),
414  errhint("Kill all remaining database processes and restart the database.")));
415  Assert(status == BGWH_STARTED);
416 
417  PG_RETURN_INT32(pid);
418 }
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:8599
Datum worker_spi_launch(PG_FUNCTION_ARGS)
Definition: worker_spi.c:379
#define PG_GETARG_INT32(n)
Definition: fmgr.h:264
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1096
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:848
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:974
static volatile sig_atomic_t got_sigterm
Definition: autoprewarm.c:100
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10628
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1522
#define DatumGetInt32(X)
Definition: postgres.h:472
int SPI_connect(void)
Definition: spi.c:89
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
PG_MODULE_MAGIC
Definition: worker_spi.c:44
void worker_spi_main(Datum)
Definition: worker_spi.c:49
static void worker_spi_sigterm(SIGNAL_ARGS)
Definition: worker_spi.c:73
char * pstrdup(const char *in)
Definition: mcxt.c:1161
void CommitTransactionCommand(void)
Definition: xact.c:2895
int SPI_finish(void)
Definition: spi.c:176
#define PG_RETURN_INT32(x)
Definition: fmgr.h:344
SPITupleTable * SPI_tuptable
Definition: spi.c:46
int bgw_restart_time
Definition: bgworker.h:94
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:192
worktable
Definition: worker_spi.c:65
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define LOG
Definition: elog.h:26
void _PG_init(void)
Definition: worker_spi.c:308
HeapTuple * vals
Definition: spi.h:26
void SetLatch(Latch *latch)
Definition: latch.c:436
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
uint64 SPI_processed
Definition: spi.c:45
void ResetLatch(Latch *latch)
Definition: latch.c:519
signed int int32
Definition: c.h:346
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
Definition: type.h:89
#define sprintf
Definition: port.h:194
Datum bgw_main_arg
Definition: bgworker.h:97
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:1028
#define DatumGetInt64(X)
Definition: postgres.h:607
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
PG_FUNCTION_INFO_V1(worker_spi_launch)
#define SPI_OK_UTILITY
Definition: spi.h:56
#define SIGHUP
Definition: win32_port.h:163
#define SPI_OK_UPDATE_RETURNING
Definition: spi.h:65
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
BgwHandleStatus
Definition: bgworker.h:102
#define ereport(elevel, rest)
Definition: elog.h:141
Definition: guc.h:72
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static void initialize_worker_spi(worktable *table)
Definition: worker_spi.c:104
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: postmaster.c:5610
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:8659
uintptr_t Datum
Definition: postgres.h:367
#define PG_WAIT_EXTENSION
Definition: pgstat.h:759
TupleDesc tupdesc
Definition: spi.h:25
#define SPI_OK_SELECT
Definition: spi.h:57
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIGNAL_ARGS
Definition: c.h:1259
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:732
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
void StartTransactionCommand(void)
Definition: xact.c:2794
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:93
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:932
const char * name
Definition: encode.c:521
static volatile sig_atomic_t got_sighup
Definition: autoprewarm.c:101
#define pg_attribute_noreturn()
Definition: c.h:147
#define Int32GetDatum(X)
Definition: postgres.h:479
static void worker_spi_sighup(SIGNAL_ARGS)
Definition: worker_spi.c:89
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:91
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:818
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
pid_t bgw_notify_pid
Definition: bgworker.h:99
#define elog(elevel,...)
Definition: elog.h:226
int i
struct Latch * MyLatch
Definition: globals.c:54
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
long val
Definition: informix.c:684
#define PG_RETURN_NULL()
Definition: fmgr.h:335
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:496
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5662
void pgstat_report_stat(bool force)
Definition: pgstat.c:813