PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
setup.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_toc.h"
#include "utils/memutils.h"
#include "test_shm_mq.h"
Include dependency graph for setup.c:

Go to the source code of this file.

Data Structures

struct  worker_state
 

Functions

static void setup_dynamic_shared_memory (int64 queue_size, int nworkers, dsm_segment **segp, test_shm_mq_header **hdrp, shm_mq **outp, shm_mq **inp)
 
static worker_statesetup_background_workers (int nworkers, dsm_segment *seg)
 
static void cleanup_background_workers (dsm_segment *seg, Datum arg)
 
static void wait_for_workers_to_become_ready (worker_state *wstate, volatile test_shm_mq_header *hdr)
 
static bool check_worker_status (worker_state *wstate)
 
void test_shm_mq_setup (int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
 

Function Documentation

static bool check_worker_status ( worker_state wstate)
static

Definition at line 299 of file setup.c.

References BGWH_POSTMASTER_DIED, BGWH_STOPPED, GetBackgroundWorkerPid(), worker_state::handle, worker_state::nworkers, and status().

Referenced by wait_for_workers_to_become_ready().

300 {
301  int n;
302 
303  /* If any workers (or the postmaster) have died, we have failed. */
304  for (n = 0; n < wstate->nworkers; ++n)
305  {
307  pid_t pid;
308 
309  status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
310  if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
311  return false;
312  }
313 
314  /* Otherwise, things still look OK. */
315  return true;
316 }
int nworkers
Definition: setup.c:29
BackgroundWorkerHandle * handle[FLEXIBLE_ARRAY_MEMBER]
Definition: setup.c:30
BgwHandleStatus
Definition: bgworker.h:102
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1004
static void cleanup_background_workers ( dsm_segment seg,
Datum  arg 
)
static

Definition at line 244 of file setup.c.

References DatumGetPointer, worker_state::handle, worker_state::nworkers, and TerminateBackgroundWorker().

Referenced by setup_background_workers(), and test_shm_mq_setup().

245 {
247 
248  while (wstate->nworkers > 0)
249  {
250  --wstate->nworkers;
251  TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
252  }
253 }
int nworkers
Definition: setup.c:29
BackgroundWorkerHandle * handle[FLEXIBLE_ARRAY_MEMBER]
Definition: setup.c:30
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
Definition: bgworker.c:1134
#define DatumGetPointer(X)
Definition: postgres.h:555
void * arg
static worker_state * setup_background_workers ( int  nworkers,
dsm_segment seg 
)
static

Definition at line 173 of file setup.c.

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_SHMEM_ACCESS, BgWorkerStart_ConsistentState, cleanup_background_workers(), CurTransactionContext, dsm_segment_handle(), ereport, errcode(), errhint(), errmsg(), ERROR, worker_state::handle, i, MemoryContextAlloc(), MemoryContextSwitchTo(), MyProcPid, NULL, worker_state::nworkers, offsetof, on_dsm_detach(), PointerGetDatum, RegisterDynamicBackgroundWorker(), snprintf(), TopTransactionContext, and UInt32GetDatum.

Referenced by test_shm_mq_setup().

174 {
175  MemoryContext oldcontext;
176  BackgroundWorker worker;
177  worker_state *wstate;
178  int i;
179 
180  /*
181  * We need the worker_state object and the background worker handles to
182  * which it points to be allocated in CurTransactionContext rather than
183  * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
184  * hooks run.
185  */
187 
188  /* Create worker state object. */
190  offsetof(worker_state, handle) +
191  sizeof(BackgroundWorkerHandle *) * nworkers);
192  wstate->nworkers = 0;
193 
194  /*
195  * Arrange to kill all the workers if we abort before all workers are
196  * finished hooking themselves up to the dynamic shared memory segment.
197  *
198  * If we die after all the workers have finished hooking themselves up to
199  * the dynamic shared memory segment, we'll mark the two queues to which
200  * we're directly connected as detached, and the worker(s) connected to
201  * those queues will exit, marking any other queues to which they are
202  * connected as detached. This will cause any as-yet-unaware workers
203  * connected to those queues to exit in their turn, and so on, until
204  * everybody exits.
205  *
206  * But suppose the workers which are supposed to connect to the queues to
207  * which we're directly attached exit due to some error before they
208  * actually attach the queues. The remaining workers will have no way of
209  * knowing this. From their perspective, they're still waiting for those
210  * workers to start, when in fact they've already died.
211  */
213  PointerGetDatum(wstate));
214 
215  /* Configure a worker. */
219  worker.bgw_main = NULL; /* new worker might not have library loaded */
220  sprintf(worker.bgw_library_name, "test_shm_mq");
221  sprintf(worker.bgw_function_name, "test_shm_mq_main");
222  snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq");
224  /* set bgw_notify_pid, so we can detect if the worker stops */
225  worker.bgw_notify_pid = MyProcPid;
226 
227  /* Register the workers. */
228  for (i = 0; i < nworkers; ++i)
229  {
230  if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
231  ereport(ERROR,
232  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
233  errmsg("could not register background process"),
234  errhint("You may need to increase max_worker_processes.")));
235  ++wstate->nworkers;
236  }
237 
238  /* All done. */
239  MemoryContextSwitchTo(oldcontext);
240  return wstate;
241 }
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
MemoryContext TopTransactionContext
Definition: mcxt.c:48
#define PointerGetDatum(X)
Definition: postgres.h:562
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
MemoryContext CurTransactionContext
Definition: mcxt.c:49
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition: dsm.c:1016
int bgw_restart_time
Definition: bgworker.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1025
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
Datum bgw_main_arg
Definition: bgworker.h:97
#define ERROR
Definition: elog.h:43
bgworker_main_type bgw_main
Definition: bgworker.h:94
int nworkers
Definition: setup.c:29
BackgroundWorkerHandle * handle[FLEXIBLE_ARRAY_MEMBER]
Definition: setup.c:30
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define UInt32GetDatum(X)
Definition: postgres.h:499
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:899
static void cleanup_background_workers(dsm_segment *seg, Datum arg)
Definition: setup.c:244
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:99
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
int i
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define offsetof(type, field)
Definition: c.h:555
static void setup_dynamic_shared_memory ( int64  queue_size,
int  nworkers,
dsm_segment **  segp,
test_shm_mq_header **  hdrp,
shm_mq **  outp,
shm_mq **  inp 
)
static

Definition at line 90 of file setup.c.

References dsm_create(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, i, test_shm_mq_header::mutex, MyProc, PG_TEST_SHM_MQ_MAGIC, shm_mq_create(), shm_mq_minimum_size, shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit, test_shm_mq_header::workers_attached, test_shm_mq_header::workers_ready, and test_shm_mq_header::workers_total.

Referenced by test_shm_mq_setup().

93 {
95  int i;
96  Size segsize;
97  dsm_segment *seg;
98  shm_toc *toc;
99  test_shm_mq_header *hdr;
100 
101  /* Ensure a valid queue size. */
102  if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
103  ereport(ERROR,
104  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
105  errmsg("queue size must be at least %zu bytes",
107  if (queue_size != ((Size) queue_size))
108  ereport(ERROR,
109  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
110  errmsg("queue size overflows size_t")));
111 
112  /*
113  * Estimate how much shared memory we need.
114  *
115  * Because the TOC machinery may choose to insert padding of oddly-sized
116  * requests, we must estimate each chunk separately.
117  *
118  * We need one key to register the location of the header, and we need
119  * nworkers + 1 keys to track the locations of the message queues.
120  */
123  for (i = 0; i <= nworkers; ++i)
124  shm_toc_estimate_chunk(&e, (Size) queue_size);
125  shm_toc_estimate_keys(&e, 2 + nworkers);
126  segsize = shm_toc_estimate(&e);
127 
128  /* Create the shared memory segment and establish a table of contents. */
129  seg = dsm_create(shm_toc_estimate(&e), 0);
131  segsize);
132 
133  /* Set up the header region. */
134  hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
135  SpinLockInit(&hdr->mutex);
136  hdr->workers_total = nworkers;
137  hdr->workers_attached = 0;
138  hdr->workers_ready = 0;
139  shm_toc_insert(toc, 0, hdr);
140 
141  /* Set up one message queue per worker, plus one. */
142  for (i = 0; i <= nworkers; ++i)
143  {
144  shm_mq *mq;
145 
146  mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
147  (Size) queue_size);
148  shm_toc_insert(toc, i + 1, mq);
149 
150  if (i == 0)
151  {
152  /* We send messages to the first queue. */
154  *outp = mq;
155  }
156  if (i == nworkers)
157  {
158  /* We receive messages from the last queue. */
160  *inp = mq;
161  }
162  }
163 
164  /* Return results to caller. */
165  *segp = seg;
166  *hdrp = hdr;
167 }
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition: shm_toc.c:40
PGPROC * MyProc
Definition: proc.c:67
#define SpinLockInit(lock)
Definition: spin.h:60
int errcode(int sqlerrcode)
Definition: elog.c:575
Size shm_toc_estimate(shm_toc_estimator *e)
Definition: shm_toc.c:241
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:49
const Size shm_mq_minimum_size
Definition: shm_mq.c:158
#define ERROR
Definition: elog.h:43
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:167
#define shm_toc_initialize_estimator(e)
Definition: shm_toc.h:47
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:215
#define ereport(elevel, rest)
Definition: elog.h:122
dsm_segment * dsm_create(Size size, int flags)
Definition: dsm.c:458
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:988
size_t Size
Definition: c.h:356
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:52
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:83
e
Definition: preproc-init.c:82
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:161
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
Definition: shm_mq.c:69
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:196
#define PG_TEST_SHM_MQ_MAGIC
Definition: test_shm_mq.h:22
void test_shm_mq_setup ( int64  queue_size,
int32  nworkers,
dsm_segment **  segp,
shm_mq_handle **  output,
shm_mq_handle **  input 
)

Definition at line 49 of file setup.c.

References cancel_on_dsm_detach(), cleanup_background_workers(), worker_state::handle, NULL, pfree(), PointerGetDatum, setup_background_workers(), setup_dynamic_shared_memory(), shm_mq_attach(), and wait_for_workers_to_become_ready().

Referenced by test_shm_mq(), and test_shm_mq_pipelined().

51 {
52  dsm_segment *seg;
53  test_shm_mq_header *hdr;
54  shm_mq *outq = NULL; /* placate compiler */
55  shm_mq *inq = NULL; /* placate compiler */
56  worker_state *wstate;
57 
58  /* Set up a dynamic shared memory segment. */
59  setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
60  *segp = seg;
61 
62  /* Register background workers. */
63  wstate = setup_background_workers(nworkers, seg);
64 
65  /* Attach the queues. */
66  *output = shm_mq_attach(outq, seg, wstate->handle[0]);
67  *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
68 
69  /* Wait for workers to become ready. */
71 
72  /*
73  * Once we reach this point, all workers are ready. We no longer need to
74  * kill them if we die; they'll die on their own as the message queues
75  * shut down.
76  */
78  PointerGetDatum(wstate));
79  pfree(wstate);
80 }
#define PointerGetDatum(X)
Definition: postgres.h:562
static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, dsm_segment **segp, test_shm_mq_header **hdrp, shm_mq **outp, shm_mq **inp)
Definition: setup.c:90
static void wait_for_workers_to_become_ready(worker_state *wstate, volatile test_shm_mq_header *hdr)
Definition: setup.c:256
void pfree(void *pointer)
Definition: mcxt.c:950
static worker_state * setup_background_workers(int nworkers, dsm_segment *seg)
Definition: setup.c:173
BackgroundWorkerHandle * handle[FLEXIBLE_ARRAY_MEMBER]
Definition: setup.c:30
#define NULL
Definition: c.h:229
static void cleanup_background_workers(dsm_segment *seg, Datum arg)
Definition: setup.c:244
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:284
Definition: shm_mq.c:69
void cancel_on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
Definition: dsm.c:1040
static void wait_for_workers_to_become_ready ( worker_state wstate,
volatile test_shm_mq_header hdr 
)
static

Definition at line 256 of file setup.c.

References CHECK_FOR_INTERRUPTS, check_worker_status(), ereport, errcode(), errmsg(), ERROR, test_shm_mq_header::mutex, MyLatch, worker_state::nworkers, PG_WAIT_EXTENSION, ResetLatch(), result, SpinLockAcquire, SpinLockRelease, WaitLatch(), WL_LATCH_SET, and test_shm_mq_header::workers_ready.

Referenced by test_shm_mq_setup().

258 {
259  bool result = false;
260 
261  for (;;)
262  {
263  int workers_ready;
264 
265  /* If all the workers are ready, we have succeeded. */
266  SpinLockAcquire(&hdr->mutex);
267  workers_ready = hdr->workers_ready;
268  SpinLockRelease(&hdr->mutex);
269  if (workers_ready >= wstate->nworkers)
270  {
271  result = true;
272  break;
273  }
274 
275  /* If any workers (or the postmaster) have died, we have failed. */
276  if (!check_worker_status(wstate))
277  {
278  result = false;
279  break;
280  }
281 
282  /* Wait to be signalled. */
284 
285  /* Reset the latch so we don't spin. */
287 
288  /* An interrupt may have occurred while we were waiting. */
290  }
291 
292  if (!result)
293  ereport(ERROR,
294  (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
295  errmsg("one or more background workers failed to start")));
296 }
int errcode(int sqlerrcode)
Definition: elog.c:575
return result
Definition: formatting.c:1618
void ResetLatch(volatile Latch *latch)
Definition: latch.c:461
#define SpinLockAcquire(lock)
Definition: spin.h:62
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:300
#define ERROR
Definition: elog.h:43
static bool check_worker_status(worker_state *wstate)
Definition: setup.c:299
int nworkers
Definition: setup.c:29
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
#define PG_WAIT_EXTENSION
Definition: pgstat.h:742
int errmsg(const char *fmt,...)
Definition: elog.c:797
struct Latch * MyLatch
Definition: globals.c:51
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define WL_LATCH_SET
Definition: latch.h:124