PostgreSQL Source Code  git master
worker_spi.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
Include dependency graph for worker_spi.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (worker_spi_launch)
 
PGDLLEXPORT void worker_spi_main (Datum)
 
static void initialize_worker_spi (worktable *table)
 
void _PG_init (void)
 
Datum worker_spi_launch (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 
 worktable
 

Function Documentation

◆ _PG_init()

void _PG_init ( void  )

Definition at line 281 of file worker_spi.c.

282 {
283  BackgroundWorker worker;
284 
285  /* get the configuration */
286  DefineCustomIntVariable("worker_spi.naptime",
287  "Duration between each check (in seconds).",
288  NULL,
289  &worker_spi_naptime,
290  10,
291  1,
292  INT_MAX,
293  PGC_SIGHUP,
294  0,
295  NULL,
296  NULL,
297  NULL);
298 
300  return;
301 
302  DefineCustomIntVariable("worker_spi.total_workers",
303  "Number of workers.",
304  NULL,
305  &worker_spi_total_workers,
306  2,
307  1,
308  100,
310  0,
311  NULL,
312  NULL,
313  NULL);
314 
315  DefineCustomStringVariable("worker_spi.database",
316  "Database to connect to.",
317  NULL,
318  &worker_spi_database,
319  "postgres",
321  0,
322  NULL, NULL, NULL);
323 
324  MarkGUCPrefixReserved("worker_spi");
325 
326  /* set up common data for all our workers */
327  memset(&worker, 0, sizeof(worker));
332  sprintf(worker.bgw_library_name, "worker_spi");
333  sprintf(worker.bgw_function_name, "worker_spi_main");
334  worker.bgw_notify_pid = 0;
335 
336  /*
337  * Now fill in worker-specific data, and do the actual registrations.
338  */
339  for (int i = 1; i <= worker_spi_total_workers; i++)
340  {
341  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
342  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
343  worker.bgw_main_arg = Int32GetDatum(i);
344 
345  RegisterBackgroundWorker(&worker);
346  }
347 }
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:875
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:5044
void MarkGUCPrefixReserved(const char *className)
Definition: guc.c:5105
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:4984
@ PGC_POSTMASTER
Definition: guc.h:70
@ PGC_SIGHUP
Definition: guc.h:71
int i
Definition: isn.c:73
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1782
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:96

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, DefineCustomIntVariable(), DefineCustomStringVariable(), i, Int32GetDatum(), MarkGUCPrefixReserved(), PGC_POSTMASTER, PGC_SIGHUP, process_shared_preload_libraries_in_progress, RegisterBackgroundWorker(), snprintf, and sprintf.

◆ initialize_worker_spi()

static void initialize_worker_spi ( worktable table)
static

Definition at line 68 of file worker_spi.c.

69 {
70  int ret;
71  int ntup;
72  bool isnull;
74 
77  SPI_connect();
79  pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
80 
81  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
83  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
84  table->schema);
85 
86  debug_query_string = buf.data;
87  ret = SPI_execute(buf.data, true, 0);
88  if (ret != SPI_OK_SELECT)
89  elog(FATAL, "SPI_execute failed: error code %d", ret);
90 
91  if (SPI_processed != 1)
92  elog(FATAL, "not a singleton result");
93 
96  1, &isnull));
97  if (isnull)
98  elog(FATAL, "null result");
99 
100  if (ntup == 0)
101  {
102  debug_query_string = NULL;
105  "CREATE SCHEMA \"%s\" "
106  "CREATE TABLE \"%s\" ("
107  " type text CHECK (type IN ('total', 'delta')), "
108  " value integer)"
109  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
110  "WHERE type = 'total'",
111  table->schema, table->name, table->name, table->name);
112 
113  /* set statement start time */
115 
116  debug_query_string = buf.data;
117  ret = SPI_execute(buf.data, false, 0);
118 
119  if (ret != SPI_OK_UTILITY)
120  elog(FATAL, "failed to create my schema");
121 
122  debug_query_string = NULL; /* rest is not statement-specific */
123  }
124 
125  SPI_finish();
128  debug_query_string = NULL;
130 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
@ STATE_RUNNING
#define FATAL
Definition: elog.h:41
static char * buf
Definition: pg_test_fsync.c:67
const char * debug_query_string
Definition: postgres.c:81
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:385
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:251
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:683
void PopActiveSnapshot(void)
Definition: snapmgr.c:778
uint64 SPI_processed
Definition: spi.c:45
SPITupleTable * SPI_tuptable
Definition: spi.c:46
int SPI_connect(void)
Definition: spi.c:95
int SPI_finish(void)
Definition: spi.c:183
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:594
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:1250
#define SPI_OK_UTILITY
Definition: spi.h:85
#define SPI_OK_SELECT
Definition: spi.h:86
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
void StartTransactionCommand(void)
Definition: xact.c:2938
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:899
void CommitTransactionCommand(void)
Definition: xact.c:3035

References appendStringInfo(), buf, CommitTransactionCommand(), DatumGetInt64(), debug_query_string, elog(), FATAL, GetTransactionSnapshot(), initStringInfo(), pgstat_report_activity(), PopActiveSnapshot(), PushActiveSnapshot(), resetStringInfo(), SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_SELECT, SPI_OK_UTILITY, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, and SPITupleTable::vals.

◆ PG_FUNCTION_INFO_V1()

PG_FUNCTION_INFO_V1 ( worker_spi_launch  )

◆ worker_spi_launch()

Datum worker_spi_launch ( PG_FUNCTION_ARGS  )

Definition at line 353 of file worker_spi.c.

354 {
355  int32 i = PG_GETARG_INT32(0);
356  BackgroundWorker worker;
357  BackgroundWorkerHandle *handle;
358  BgwHandleStatus status;
359  pid_t pid;
360 
361  memset(&worker, 0, sizeof(worker));
366  sprintf(worker.bgw_library_name, "worker_spi");
367  sprintf(worker.bgw_function_name, "worker_spi_main");
368  snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
369  snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
370  worker.bgw_main_arg = Int32GetDatum(i);
371  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
372  worker.bgw_notify_pid = MyProcPid;
373 
374  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
375  PG_RETURN_NULL();
376 
377  status = WaitForBackgroundWorkerStartup(handle, &pid);
378 
379  if (status == BGWH_STOPPED)
380  ereport(ERROR,
381  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
382  errmsg("could not start background process"),
383  errhint("More details may be available in the server log.")));
384  if (status == BGWH_POSTMASTER_DIED)
385  ereport(ERROR,
386  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
387  errmsg("cannot start background processes without postmaster"),
388  errhint("Kill all remaining database processes and restart the database.")));
389  Assert(status == BGWH_STARTED);
390 
391  PG_RETURN_INT32(pid);
392 }
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1126
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:959
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_POSTMASTER_DIED
Definition: bgworker.h:108
@ BGWH_STARTED
Definition: bgworker.h:105
@ BGWH_STOPPED
Definition: bgworker.h:107
signed int int32
Definition: c.h:478
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_INT32(x)
Definition: fmgr.h:354
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
int MyProcPid
Definition: globals.c:44
Assert(fmt[strlen(fmt) - 1] !='\n')

References Assert(), BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWH_POSTMASTER_DIED, BGWH_STARTED, BGWH_STOPPED, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ereport, errcode(), errhint(), errmsg(), ERROR, i, Int32GetDatum(), MyProcPid, PG_GETARG_INT32, PG_RETURN_INT32, PG_RETURN_NULL, RegisterDynamicBackgroundWorker(), snprintf, sprintf, and WaitForBackgroundWorkerStartup().

◆ worker_spi_main()

void worker_spi_main ( Datum  main_arg)

Definition at line 49 of file worker_spi.c.

58 {
59  const char *schema;
60  const char *name;
61 } worktable;
const char * name
Definition: encode.c:571
worktable
Definition: worker_spi.c:61

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 45 of file worker_spi.c.

◆ worktable

worktable

Definition at line 61 of file worker_spi.c.