PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker_spi.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
Include dependency graph for worker_spi.c:

Go to the source code of this file.

Functions

 PG_FUNCTION_INFO_V1 (worker_spi_launch)
 
void _PG_init (void)
 
void worker_spi_main (Datum)
 
static void worker_spi_sigterm (SIGNAL_ARGS)
 
static void worker_spi_sighup (SIGNAL_ARGS)
 
static void initialize_worker_spi (worktable *table)
 
Datum worker_spi_launch (PG_FUNCTION_ARGS)
 

Variables

 PG_MODULE_MAGIC
 
 worktable
 

Function Documentation

void _PG_init ( void  )

Definition at line 310 of file worker_spi.c.

References auth_delay_checks(), auth_delay_milliseconds, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ClientAuthentication_hook, DefineCustomIntVariable(), GUC_UNIT_MS, i, Int32GetDatum, NULL, original_client_auth_hook, PGC_POSTMASTER, PGC_SIGHUP, process_shared_preload_libraries_in_progress, RegisterBackgroundWorker(), and snprintf().

311 {
312  BackgroundWorker worker;
313  unsigned int i;
314 
315  /* get the configuration */
316  DefineCustomIntVariable("worker_spi.naptime",
317  "Duration between each check (in seconds).",
318  NULL,
319  &worker_spi_naptime,
320  10,
321  1,
322  INT_MAX,
323  PGC_SIGHUP,
324  0,
325  NULL,
326  NULL,
327  NULL);
328 
330  return;
331 
332  DefineCustomIntVariable("worker_spi.total_workers",
333  "Number of workers.",
334  NULL,
335  &worker_spi_total_workers,
336  2,
337  1,
338  100,
340  0,
341  NULL,
342  NULL,
343  NULL);
344 
345  /* set up common data for all our workers */
346  memset(&worker, 0, sizeof(worker));
351  sprintf(worker.bgw_library_name, "worker_spi");
352  sprintf(worker.bgw_function_name, "worker_spi_main");
353  worker.bgw_notify_pid = 0;
354 
355  /*
356  * Now fill in worker-specific data, and do the actual registrations.
357  */
358  for (i = 1; i <= worker_spi_total_workers; i++)
359  {
360  snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
361  worker.bgw_main_arg = Int32GetDatum(i);
362 
363  RegisterBackgroundWorker(&worker);
364  }
365 }
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:7755
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:849
bool process_shared_preload_libraries_in_progress
Definition: miscinit.c:1417
int bgw_restart_time
Definition: bgworker.h:93
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
Datum bgw_main_arg
Definition: bgworker.h:96
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
Definition: guc.h:72
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
#define Int32GetDatum(X)
Definition: postgres.h:485
pid_t bgw_notify_pid
Definition: bgworker.h:98
int i
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
static void initialize_worker_spi ( worktable table)
static

Definition at line 103 of file worker_spi.c.

References appendStringInfo(), buf, CommitTransactionCommand(), StringInfoData::data, DatumGetInt64, elog, FATAL, GetTransactionSnapshot(), initStringInfo(), NULL, pgstat_report_activity(), PopActiveSnapshot(), PushActiveSnapshot(), resetStringInfo(), SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_SELECT, SPI_OK_UTILITY, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, and SPITupleTable::vals.

104 {
105  int ret;
106  int ntup;
107  bool isnull;
109 
112  SPI_connect();
114  pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
115 
116  /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
117  initStringInfo(&buf);
118  appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
119  table->schema);
120 
121  ret = SPI_execute(buf.data, true, 0);
122  if (ret != SPI_OK_SELECT)
123  elog(FATAL, "SPI_execute failed: error code %d", ret);
124 
125  if (SPI_processed != 1)
126  elog(FATAL, "not a singleton result");
127 
130  1, &isnull));
131  if (isnull)
132  elog(FATAL, "null result");
133 
134  if (ntup == 0)
135  {
136  resetStringInfo(&buf);
137  appendStringInfo(&buf,
138  "CREATE SCHEMA \"%s\" "
139  "CREATE TABLE \"%s\" ("
140  " type text CHECK (type IN ('total', 'delta')), "
141  " value integer)"
142  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
143  "WHERE type = 'total'",
144  table->schema, table->name, table->name, table->name);
145 
146  /* set statement start time */
148 
149  ret = SPI_execute(buf.data, false, 0);
150 
151  if (ret != SPI_OK_UTILITY)
152  elog(FATAL, "failed to create my schema");
153  }
154 
155  SPI_finish();
159 }
int SPI_connect(void)
Definition: spi.c:84
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
void CommitTransactionCommand(void)
Definition: xact.c:2749
int SPI_finish(void)
Definition: spi.c:148
SPITupleTable * SPI_tuptable
Definition: spi.c:41
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
HeapTuple * vals
Definition: spi.h:28
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:300
uint64 SPI_processed
Definition: spi.c:39
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define FATAL
Definition: elog.h:52
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
Definition: spi.c:836
#define DatumGetInt64(X)
Definition: postgres.h:613
static char * buf
Definition: pg_test_fsync.c:66
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:728
#define SPI_OK_UTILITY
Definition: spi.h:53
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
TupleDesc tupdesc
Definition: spi.h:27
#define SPI_OK_SELECT
Definition: spi.h:54
#define NULL
Definition: c.h:229
void StartTransactionCommand(void)
Definition: xact.c:2679
void SetCurrentStatementStartTimestamp(void)
Definition: xact.c:740
#define elog
Definition: elog.h:219
int SPI_execute(const char *src, bool read_only, long tcount)
Definition: spi.c:304
PG_FUNCTION_INFO_V1 ( worker_spi_launch  )
Datum worker_spi_launch ( PG_FUNCTION_ARGS  )

Definition at line 371 of file worker_spi.c.

References Assert, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWH_POSTMASTER_DIED, BGWH_STARTED, BGWH_STOPPED, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ereport, errcode(), errhint(), errmsg(), ERROR, i, Int32GetDatum, MyProcPid, PG_GETARG_INT32, PG_RETURN_INT32, PG_RETURN_NULL, RegisterDynamicBackgroundWorker(), snprintf(), status(), and WaitForBackgroundWorkerStartup().

372 {
373  int32 i = PG_GETARG_INT32(0);
374  BackgroundWorker worker;
375  BackgroundWorkerHandle *handle;
377  pid_t pid;
378 
379  memset(&worker, 0, sizeof(worker));
384  sprintf(worker.bgw_library_name, "worker_spi");
385  sprintf(worker.bgw_function_name, "worker_spi_main");
386  snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
387  worker.bgw_main_arg = Int32GetDatum(i);
388  /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
389  worker.bgw_notify_pid = MyProcPid;
390 
391  if (!RegisterDynamicBackgroundWorker(&worker, &handle))
392  PG_RETURN_NULL();
393 
394  status = WaitForBackgroundWorkerStartup(handle, &pid);
395 
396  if (status == BGWH_STOPPED)
397  ereport(ERROR,
398  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
399  errmsg("could not start background process"),
400  errhint("More details may be available in the server log.")));
401  if (status == BGWH_POSTMASTER_DIED)
402  ereport(ERROR,
403  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
404  errmsg("cannot start background processes without postmaster"),
405  errhint("Kill all remaining database processes and restart the database.")));
406  Assert(status == BGWH_STARTED);
407 
408  PG_RETURN_INT32(pid);
409 }
#define PG_GETARG_INT32(n)
Definition: fmgr.h:234
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1090
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
#define PG_RETURN_INT32(x)
Definition: fmgr.h:314
int bgw_restart_time
Definition: bgworker.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
signed int int32
Definition: c.h:256
Datum bgw_main_arg
Definition: bgworker.h:96
#define ERROR
Definition: elog.h:43
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
BgwHandleStatus
Definition: bgworker.h:101
#define ereport(elevel, rest)
Definition: elog.h:122
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:675
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:933
#define Int32GetDatum(X)
Definition: postgres.h:485
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:98
int i
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
#define PG_RETURN_NULL()
Definition: fmgr.h:305
void worker_spi_main ( Datum  main_arg)

Definition at line 49 of file worker_spi.c.

References name.

61 {
62  const char *schema;
63  const char *name;
64 } worktable;
worktable
Definition: worker_spi.c:64
const char * name
Definition: encode.c:521
static void worker_spi_sighup ( SIGNAL_ARGS  )
static

Definition at line 88 of file worker_spi.c.

References MyLatch, and SetLatch().

89 {
90  int save_errno = errno;
91 
92  got_sighup = true;
94 
95  errno = save_errno;
96 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
struct Latch * MyLatch
Definition: globals.c:51
static void worker_spi_sigterm ( SIGNAL_ARGS  )
static

Definition at line 72 of file worker_spi.c.

References MyLatch, and SetLatch().

73 {
74  int save_errno = errno;
75 
76  got_sigterm = true;
78 
79  errno = save_errno;
80 }
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
struct Latch * MyLatch
Definition: globals.c:51

Variable Documentation

PG_MODULE_MAGIC

Definition at line 44 of file worker_spi.c.

worktable

Definition at line 64 of file worker_spi.c.