PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker_spi.c
Go to the documentation of this file.
1 /* -------------------------------------------------------------------------
2  *
3  * worker_spi.c
4  * Sample background worker code that demonstrates various coding
5  * patterns: establishing a database connection; starting and committing
6  * transactions; using GUC variables, and heeding SIGHUP to reread
7  * the configuration file; reporting to pg_stat_activity; using the
8  * process latch to sleep and exit in case of postmaster death.
9  *
10  * This code connects to a database, creates a schema and table, and summarizes
11  * the numbers contained therein. To see it working, insert an initial value
12  * with "total" type and some initial value; then insert some other rows with
13  * "delta" type. Delta rows will be deleted by this worker and their values
14  * aggregated into the total.
15  *
16  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
17  *
18  * IDENTIFICATION
19  * src/test/modules/worker_spi/worker_spi.c
20  *
21  * -------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24 
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33 
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43 
45 
47 
48 void _PG_init(void);
50 
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54 
55 /* GUC variables */
56 static int worker_spi_naptime = 10;
57 static int worker_spi_total_workers = 2;
58 
59 
60 typedef struct worktable
61 {
62  const char *schema;
63  const char *name;
65 
66 /*
67  * Signal handler for SIGTERM
68  * Set a flag to let the main loop to terminate, and set our latch to wake
69  * it up.
70  */
71 static void
73 {
74  int save_errno = errno;
75 
76  got_sigterm = true;
78 
79  errno = save_errno;
80 }
81 
82 /*
83  * Signal handler for SIGHUP
84  * Set a flag to tell the main loop to reread the config file, and set
85  * our latch to wake it up.
86  */
87 static void
89 {
90  int save_errno = errno;
91 
92  got_sighup = true;
94 
95  errno = save_errno;
96 }
97 
98 /*
99  * Initialize workspace for a worker process: create the schema if it doesn't
100  * already exist.
101  */
102 static void
104 {
105  int ret;
106  int ntup;
107  bool isnull;
109 
112  SPI_connect();
114  pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
115 
116  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
117  initStringInfo(&buf);
118  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
119  table->schema);
120 
121  ret = SPI_execute(buf.data, true, 0);
122  if (ret != SPI_OK_SELECT)
123  elog(FATAL, "SPI_execute failed: error code %d", ret);
124 
125  if (SPI_processed != 1)
126  elog(FATAL, "not a singleton result");
127 
130  1, &isnull));
131  if (isnull)
132  elog(FATAL, "null result");
133 
134  if (ntup == 0)
135  {
136  resetStringInfo(&buf);
137  appendStringInfo(&buf,
138  "CREATE SCHEMA \"%s\" "
139  "CREATE TABLE \"%s\" ("
140  " type text CHECK (type IN ('total', 'delta')), "
141  " value integer)"
142  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
143  "WHERE type = 'total'",
144  table->schema, table->name, table->name, table->name);
145 
146  /* set statement start time */
148 
149  ret = SPI_execute(buf.data, false, 0);
150 
151  if (ret != SPI_OK_UTILITY)
152  elog(FATAL, "failed to create my schema");
153  }
154 
155  SPI_finish();
159 }
160 
161 void
162 worker_spi_main(Datum main_arg)
163 {
164  int index = DatumGetInt32(main_arg);
165  worktable *table;
167  char name[20];
168 
169  table = palloc(sizeof(worktable));
170  sprintf(name, "schema%d", index);
171  table->schema = pstrdup(name);
172  table->name = pstrdup("counted");
173 
174  /* Establish signal handlers before unblocking signals. */
176  pqsignal(SIGTERM, worker_spi_sigterm);
177 
178  /* We're now ready to receive signals */
180 
181  /* Connect to our database */
183 
184  elog(LOG, "%s initialized with %s.%s",
185  MyBgworkerEntry->bgw_name, table->schema, table->name);
186  initialize_worker_spi(table);
187 
188  /*
189  * Quote identifiers passed to us. Note that this must be done after
190  * initialize_worker_spi, because that routine assumes the names are not
191  * quoted.
192  *
193  * Note some memory might be leaked here.
194  */
195  table->schema = quote_identifier(table->schema);
196  table->name = quote_identifier(table->name);
197 
198  initStringInfo(&buf);
199  appendStringInfo(&buf,
200  "WITH deleted AS (DELETE "
201  "FROM %s.%s "
202  "WHERE type = 'delta' RETURNING value), "
203  "total AS (SELECT coalesce(sum(value), 0) as sum "
204  "FROM deleted) "
205  "UPDATE %s.%s "
206  "SET value = %s.value + total.sum "
207  "FROM total WHERE type = 'total' "
208  "RETURNING %s.value",
209  table->schema, table->name,
210  table->schema, table->name,
211  table->name,
212  table->name);
213 
214  /*
215  * Main loop: do this until the SIGTERM handler tells us to terminate
216  */
217  while (!got_sigterm)
218  {
219  int ret;
220  int rc;
221 
222  /*
223  * Background workers mustn't call usleep() or any direct equivalent:
224  * instead, they may wait on their process latch, which sleeps as
225  * necessary, but is awakened if postmaster dies. That way the
226  * background process goes away immediately in an emergency.
227  */
228  rc = WaitLatch(MyLatch,
230  worker_spi_naptime * 1000L,
233 
234  /* emergency bailout if postmaster has died */
235  if (rc & WL_POSTMASTER_DEATH)
236  proc_exit(1);
237 
239 
240  /*
241  * In case of a SIGHUP, just reload the configuration.
242  */
243  if (got_sighup)
244  {
245  got_sighup = false;
247  }
248 
249  /*
250  * Start a transaction on which we can run queries. Note that each
251  * StartTransactionCommand() call should be preceded by a
252  * SetCurrentStatementStartTimestamp() call, which sets both the time
253  * for the statement we're about the run, and also the transaction
254  * start time. Also, each other query sent to SPI should probably be
255  * preceded by SetCurrentStatementStartTimestamp(), so that statement
256  * start time is always up to date.
257  *
258  * The SPI_connect() call lets us run queries through the SPI manager,
259  * and the PushActiveSnapshot() call creates an "active" snapshot
260  * which is necessary for queries to have MVCC data to work on.
261  *
262  * The pgstat_report_activity() call makes our activity visible
263  * through the pgstat views.
264  */
267  SPI_connect();
270 
271  /* We can now execute queries via SPI */
272  ret = SPI_execute(buf.data, false, 0);
273 
274  if (ret != SPI_OK_UPDATE_RETURNING)
275  elog(FATAL, "cannot select from table %s.%s: error code %d",
276  table->schema, table->name, ret);
277 
278  if (SPI_processed > 0)
279  {
280  bool isnull;
281  int32 val;
282 
285  1, &isnull));
286  if (!isnull)
287  elog(LOG, "%s: count in %s.%s is now %d",
289  table->schema, table->name, val);
290  }
291 
292  /*
293  * And finish our transaction.
294  */
295  SPI_finish();
298  pgstat_report_stat(false);
300  }
301 
302  proc_exit(1);
303 }
304 
305 /*
306  * Entrypoint of this module.
307  *
308  * We register more than one worker process here, to demonstrate how that can
309  * be done.
310  */
311 void
312 _PG_init(void)
313 {
314  BackgroundWorker worker;
315  unsigned int i;
316 
317  /* get the configuration */
318  DefineCustomIntVariable("worker_spi.naptime",
319  "Duration between each check (in seconds).",
320  NULL,
321  &worker_spi_naptime,
322  10,
323  1,
324  INT_MAX,
325  PGC_SIGHUP,
326  0,
327  NULL,
328  NULL,
329  NULL);
330 
332  return;
333 
334  DefineCustomIntVariable("worker_spi.total_workers",
335  "Number of workers.",
336  NULL,
337  &worker_spi_total_workers,
338  2,
339  1,
340  100,
342  0,
343  NULL,
344  NULL,
345  NULL);
346 
347  /* set up common data for all our workers */
348  memset(&worker, 0, sizeof(worker));
353  sprintf(worker.bgw_library_name, "worker_spi");
354  sprintf(worker.bgw_function_name, "worker_spi_main");
355  worker.bgw_notify_pid = 0;
356 
357  /*
358  * Now fill in worker-specific data, and do the actual registrations.
359  */
360  for (i = 1; i <= worker_spi_total_workers; i++)
361  {
362  snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
363  worker.bgw_main_arg = Int32GetDatum(i);
364 
365  RegisterBackgroundWorker(&worker);
366  }
367 }
368 
369 /*
370  * Dynamically launch an SPI worker.
371  */
372 Datum
374 {
375  int32 i = PG_GETARG_INT32(0);
376  BackgroundWorker worker;
377  BackgroundWorkerHandle *handle;
379  pid_t pid;
380 
381  memset(&worker, 0, sizeof(worker));
386  sprintf(worker.bgw_library_name, "worker_spi");
387  sprintf(worker.bgw_function_name, "worker_spi_main");
388  snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
389  worker.bgw_main_arg = Int32GetDatum(i);
390  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
391  worker.bgw_notify_pid = MyProcPid;
392 
393  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
394  PG_RETURN_NULL();
395 
396  status = WaitForBackgroundWorkerStartup(handle, &pid);
397 
398  if (status == BGWH_STOPPED)
399  ereport(ERROR,
400  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
401  errmsg("could not start background process"),
402  errhint("More details may be available in the server log.")));
403  if (status == BGWH_POSTMASTER_DIED)
404  ereport(ERROR,
405  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
406  errmsg("cannot start background processes without postmaster"),
407  errhint("Kill all remaining database processes and restart the database.")));
408  Assert(status == BGWH_STARTED);
409 
410  PG_RETURN_INT32(pid);
411 }
typedef(PHANDLE, HANDLE, WAITORTIMERCALLBACK, PVOID, ULONG, ULONG)
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:7766
Datum worker_spi_launch(PG_FUNCTION_ARGS)
Definition: worker_spi.c:373
#define PG_GETARG_INT32(n)
Definition: fmgr.h:234
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1090
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:849
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10381
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1426
#define DatumGetInt32(X)
Definition: postgres.h:478
int SPI_connect(void)
Definition: spi.c:84
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
PG_MODULE_MAGIC
Definition: worker_spi.c:44
void worker_spi_main(Datum)
Definition: worker_spi.c:49
static void worker_spi_sigterm(SIGNAL_ARGS)
Definition: worker_spi.c:72
char * pstrdup(const char *in)
Definition: mcxt.c:1077
void CommitTransactionCommand(void)
Definition: xact.c:2750
int SPI_finish(void)
Definition: spi.c:148
#define PG_RETURN_INT32(x)
Definition: fmgr.h:314
SPITupleTable * SPI_tuptable
Definition: spi.c:41
int bgw_restart_time
Definition: bgworker.h:93
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:190
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
worktable
Definition: worker_spi.c:64
#define false
Definition: c.h:210
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
void _PG_init(void)
Definition: worker_spi.c:312
HeapTuple * vals
Definition: spi.h:28
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:304
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
uint64 SPI_processed
Definition: spi.c:39
signed int int32
Definition: c.h:256
Definition: type.h:89
Datum bgw_main_arg
Definition: bgworker.h:96
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
#define FATAL
Definition: elog.h:52
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:836
#define DatumGetInt64(X)
Definition: postgres.h:613
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
PG_FUNCTION_INFO_V1(worker_spi_launch)
#define SPI_OK_UTILITY
Definition: spi.h:53
#define SPI_OK_UPDATE_RETURNING
Definition: spi.h:62
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
BgwHandleStatus
Definition: bgworker.h:101
#define ereport(elevel, rest)
Definition: elog.h:122
Definition: guc.h:72
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static void initialize_worker_spi(worktable *table)
Definition: worker_spi.c:103
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
uintptr_t Datum
Definition: postgres.h:372
#define PG_WAIT_EXTENSION
Definition: pgstat.h:742
TupleDesc tupdesc
Definition: spi.h:27
#define SPI_OK_SELECT
Definition: spi.h:54
#define SIGHUP
Definition: win32.h:188
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define SIGNAL_ARGS
Definition: c.h:1080
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:676
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
void StartTransactionCommand(void)
Definition: xact.c:2680
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:933
const char * name
Definition: encode.c:521
#define pg_attribute_noreturn()
Definition: c.h:654
#define Int32GetDatum(X)
Definition: postgres.h:485
static void worker_spi_sighup(SIGNAL_ARGS)
Definition: worker_spi.c:88
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:740
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:98
int i
struct Latch * MyLatch
Definition: globals.c:52
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
#define WL_LATCH_SET
Definition: latch.h:124
long val
Definition: informix.c:689
void BackgroundWorkerInitializeConnection(char *dbname, char *username)
Definition: postmaster.c:5527
#define PG_RETURN_NULL()
Definition: fmgr.h:305
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:304
void BackgroundWorkerUnblockSignals(void)
Definition: postmaster.c:5579
void pgstat_report_stat(bool force)
Definition: pgstat.c:812