PostgreSQL Source Code  git master
worker_spi.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
Include dependency graph for worker_spi.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (worker_spi_launch)
 
void _PG_init (void)
 
void worker_spi_main (Datum)
 
static void worker_spi_sigterm (SIGNAL_ARGS)
 
static void worker_spi_sighup (SIGNAL_ARGS)
 
static void initialize_worker_spi (worktable *table)
 
Datum worker_spi_launch (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 
 worktable
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 308 of file worker_spi.c.

References auth_delay_checks(), auth_delay_milliseconds, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ClientAuthentication_hook, DefineCustomIntVariable(), DefineCustomStringVariable(), GUC_UNIT_MS, i, Int32GetDatum, original_client_auth_hook, PGC_POSTMASTER, PGC_SIGHUP, process_shared_preload_libraries_in_progress, RegisterBackgroundWorker(), snprintf, and sprintf.

309 {
310  BackgroundWorker worker;
311  unsigned int i;
312 
313  /* get the configuration */
314  DefineCustomIntVariable("worker_spi.naptime",
315  "Duration between each check (in seconds).",
316  NULL,
317  &worker_spi_naptime,
318  10,
319  1,
320  INT_MAX,
321  PGC_SIGHUP,
322  0,
323  NULL,
324  NULL,
325  NULL);
326 
328  return;
329 
330  DefineCustomIntVariable("worker_spi.total_workers",
331  "Number of workers.",
332  NULL,
333  &worker_spi_total_workers,
334  2,
335  1,
336  100,
338  0,
339  NULL,
340  NULL,
341  NULL);
342 
343  DefineCustomStringVariable("worker_spi.database",
344  "Database to connect to.",
345  NULL,
346  &worker_spi_database,
347  "postgres",
349  0,
350  NULL, NULL, NULL);
351 
352  /* set up common data for all our workers */
353  memset(&worker, 0, sizeof(worker));
358  sprintf(worker.bgw_library_name, "worker_spi");
359  sprintf(worker.bgw_function_name, "worker_spi_main");
360  worker.bgw_notify_pid = 0;
361 
362  /*
363  * Now fill in worker-specific data, and do the actual registrations.
364  */
365  for (i = 1; i <= worker_spi_total_workers; i++)
366  {
367  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
368  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
369  worker.bgw_main_arg = Int32GetDatum(i);
370 
371  RegisterBackgroundWorker(&worker);
372  }
373 }
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:8599
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:848
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1522
int bgw_restart_time
Definition: bgworker.h:94
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
#define sprintf
Definition: port.h:194
Datum bgw_main_arg
Definition: bgworker.h:97
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
Definition: guc.h:72
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:8659
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:93
#define Int32GetDatum(X)
Definition: postgres.h:479
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:91
pid_t bgw_notify_pid
Definition: bgworker.h:99
int i
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define snprintf
Definition: port.h:192

◆ initialize_worker_spi()

static void initialize_worker_spi ( worktable table)
static

Definition at line 104 of file worker_spi.c.

References appendStringInfo(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, buf, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), StringInfoData::data, DatumGetInt32, DatumGetInt64, elog, FATAL, GetTransactionSnapshot(), got_sighup, got_sigterm, initStringInfo(), LOG, MyBgworkerEntry, MyLatch, name, palloc(), PG_WAIT_EXTENSION, PGC_SIGHUP, pgstat_report_activity(), pgstat_report_stat(), PopActiveSnapshot(), pqsignal(), proc_exit(), ProcessConfigFile(), pstrdup(), PushActiveSnapshot(), quote_identifier(), ResetLatch(), resetStringInfo(), SetCurrentStatementStartTimestamp(), SIGHUP, SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_SELECT, SPI_OK_UPDATE_RETURNING, SPI_OK_UTILITY, SPI_processed, SPI_tuptable, sprintf, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, val, SPITupleTable::vals, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, worker_spi_main(), worker_spi_sighup(), worker_spi_sigterm(), and worktable.

105 {
106  int ret;
107  int ntup;
108  bool isnull;
110 
113  SPI_connect();
115  pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
116 
117  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
118  initStringInfo(&buf);
119  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
120  table->schema);
121 
122  ret = SPI_execute(buf.data, true, 0);
123  if (ret != SPI_OK_SELECT)
124  elog(FATAL, "SPI_execute failed: error code %d", ret);
125 
126  if (SPI_processed != 1)
127  elog(FATAL, "not a singleton result");
128 
131  1, &isnull));
132  if (isnull)
133  elog(FATAL, "null result");
134 
135  if (ntup == 0)
136  {
137  resetStringInfo(&buf);
138  appendStringInfo(&buf,
139  "CREATE SCHEMA \"%s\" "
140  "CREATE TABLE \"%s\" ("
141  " type text CHECK (type IN ('total', 'delta')), "
142  " value integer)"
143  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
144  "WHERE type = 'total'",
145  table->schema, table->name, table->name, table->name);
146 
147  /* set statement start time */
149 
150  ret = SPI_execute(buf.data, false, 0);
151 
152  if (ret != SPI_OK_UTILITY)
153  elog(FATAL, "failed to create my schema");
154  }
155 
156  SPI_finish();
160 }
int SPI_connect(void)
Definition: spi.c:89
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
void CommitTransactionCommand(void)
Definition: xact.c:2895
int SPI_finish(void)
Definition: spi.c:176
SPITupleTable * SPI_tuptable
Definition: spi.c:46
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
HeapTuple * vals
Definition: spi.h:26
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
uint64 SPI_processed
Definition: spi.c:45
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define FATAL
Definition: elog.h:52
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:1028
#define DatumGetInt64(X)
Definition: postgres.h:607
static char * buf
Definition: pg_test_fsync.c:68
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define SPI_OK_UTILITY
Definition: spi.h:56
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
TupleDesc tupdesc
Definition: spi.h:25
#define SPI_OK_SELECT
Definition: spi.h:57
void StartTransactionCommand(void)
Definition: xact.c:2794
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:818
#define elog(elevel,...)
Definition: elog.h:226
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:496

◆ PG_FUNCTION_INFO_V1()

PG_FUNCTION_INFO_V1 ( worker_spi_launch  )

◆ worker_spi_launch()

Datum worker_spi_launch ( PG_FUNCTION_ARGS  )

Definition at line 379 of file worker_spi.c.

References Assert, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWH_POSTMASTER_DIED, BGWH_STARTED, BGWH_STOPPED, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ereport, errcode(), errhint(), errmsg(), ERROR, i, Int32GetDatum, MyProcPid, PG_GETARG_INT32, PG_RETURN_INT32, PG_RETURN_NULL, RegisterDynamicBackgroundWorker(), snprintf, sprintf, status(), and WaitForBackgroundWorkerStartup().

380 {
381  int32 i = PG_GETARG_INT32(0);
382  BackgroundWorker worker;
383  BackgroundWorkerHandle *handle;
385  pid_t pid;
386 
387  memset(&worker, 0, sizeof(worker));
392  sprintf(worker.bgw_library_name, "worker_spi");
393  sprintf(worker.bgw_function_name, "worker_spi_main");
394  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
395  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
396  worker.bgw_main_arg = Int32GetDatum(i);
397  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
398  worker.bgw_notify_pid = MyProcPid;
399 
400  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
401  PG_RETURN_NULL();
402 
403  status = WaitForBackgroundWorkerStartup(handle, &pid);
404 
405  if (status == BGWH_STOPPED)
406  ereport(ERROR,
407  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
408  errmsg("could not start background process"),
409  errhint("More details may be available in the server log.")));
410  if (status == BGWH_POSTMASTER_DIED)
411  ereport(ERROR,
412  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
413  errmsg("cannot start background processes without postmaster"),
414  errhint("Kill all remaining database processes and restart the database.")));
415  Assert(status == BGWH_STARTED);
416 
417  PG_RETURN_INT32(pid);
418 }
#define PG_GETARG_INT32(n)
Definition: fmgr.h:264
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1096
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:974
#define PG_RETURN_INT32(x)
Definition: fmgr.h:344
int bgw_restart_time
Definition: bgworker.h:94
int errcode(int sqlerrcode)
Definition: elog.c:570
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
signed int int32
Definition: c.h:346
#define sprintf
Definition: port.h:194
Datum bgw_main_arg
Definition: bgworker.h:97
#define ERROR
Definition: elog.h:43
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
BgwHandleStatus
Definition: bgworker.h:102
#define ereport(elevel, rest)
Definition: elog.h:141
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:732
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:93
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:932
#define Int32GetDatum(X)
Definition: postgres.h:479
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:784
pid_t bgw_notify_pid
Definition: bgworker.h:99
int i
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define snprintf
Definition: port.h:192
#define PG_RETURN_NULL()
Definition: fmgr.h:335

◆ worker_spi_main()

void worker_spi_main ( Datum  main_arg)

Definition at line 49 of file worker_spi.c.

Referenced by initialize_worker_spi().

62 {
63  const char *schema;
64  const char *name;
65 } worktable;
worktable
Definition: worker_spi.c:65
const char * name
Definition: encode.c:521

◆ worker_spi_sighup()

static void worker_spi_sighup ( SIGNAL_ARGS  )
static

Definition at line 89 of file worker_spi.c.

References got_sighup, MyLatch, and SetLatch().

Referenced by initialize_worker_spi().

90 {
91  int save_errno = errno;
92 
93  got_sighup = true;
95 
96  errno = save_errno;
97 }
void SetLatch(Latch *latch)
Definition: latch.c:436
static volatile sig_atomic_t got_sighup
Definition: autoprewarm.c:101
struct Latch * MyLatch
Definition: globals.c:54

◆ worker_spi_sigterm()

static void worker_spi_sigterm ( SIGNAL_ARGS  )
static

Definition at line 73 of file worker_spi.c.

References got_sigterm, MyLatch, and SetLatch().

Referenced by initialize_worker_spi().

74 {
75  int save_errno = errno;
76 
77  got_sigterm = true;
79 
80  errno = save_errno;
81 }
static volatile sig_atomic_t got_sigterm
Definition: autoprewarm.c:100
void SetLatch(Latch *latch)
Definition: latch.c:436
struct Latch * MyLatch
Definition: globals.c:54

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 44 of file worker_spi.c.

◆ worktable

worktable

Definition at line 65 of file worker_spi.c.

Referenced by initialize_worker_spi().