PostgreSQL Source Code  git master
condition_variable.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "portability/instr_time.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/proclist.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for condition_variable.c:

Go to the source code of this file.

Functions

void ConditionVariableInit (ConditionVariable *cv)
 
void ConditionVariablePrepareToSleep (ConditionVariable *cv)
 
void ConditionVariableSleep (ConditionVariable *cv, uint32 wait_event_info)
 
bool ConditionVariableTimedSleep (ConditionVariable *cv, long timeout, uint32 wait_event_info)
 
void ConditionVariableCancelSleep (void)
 
void ConditionVariableSignal (ConditionVariable *cv)
 
void ConditionVariableBroadcast (ConditionVariable *cv)
 

Variables

static ConditionVariablecv_sleep_target = NULL
 
static WaitEventSetcv_wait_event_set = NULL
 

Function Documentation

◆ ConditionVariableBroadcast()

void ConditionVariableBroadcast ( ConditionVariable cv)

Definition at line 303 of file condition_variable.c.

References Assert, ConditionVariableCancelSleep(), ConditionVariable::mutex, MyProc, PGPROC::pgprocno, PGPROC::procLatch, proclist_contains, proclist_is_empty(), proclist_pop_head_node, proclist_push_tail, SetLatch(), SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_done(), BarrierArriveAndWait(), BarrierDetachImpl(), BitmapDoneInitializingSharedState(), CheckpointerMain(), ReplicationOriginExitCleanup(), ReplicationSlotAcquire(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), replorigin_session_reset(), and replorigin_session_setup().

304 {
305  int pgprocno = MyProc->pgprocno;
306  PGPROC *proc = NULL;
307  bool have_sentinel = false;
308 
309  /*
310  * In some use-cases, it is common for awakened processes to immediately
311  * re-queue themselves. If we just naively try to reduce the wakeup list
312  * to empty, we'll get into a potentially-indefinite loop against such a
313  * process. The semantics we really want are just to be sure that we have
314  * wakened all processes that were in the list at entry. We can use our
315  * own cvWaitLink as a sentinel to detect when we've finished.
316  *
317  * A seeming flaw in this approach is that someone else might signal the
318  * CV and in doing so remove our sentinel entry. But that's fine: since
319  * CV waiters are always added and removed in order, that must mean that
320  * every previous waiter has been wakened, so we're done. We'll get an
321  * extra "set" on our latch from the someone else's signal, which is
322  * slightly inefficient but harmless.
323  *
324  * We can't insert our cvWaitLink as a sentinel if it's already in use in
325  * some other proclist. While that's not expected to be true for typical
326  * uses of this function, we can deal with it by simply canceling any
327  * prepared CV sleep. The next call to ConditionVariableSleep will take
328  * care of re-establishing the lost state.
329  */
330  if (cv_sleep_target != NULL)
332 
333  /*
334  * Inspect the state of the queue. If it's empty, we have nothing to do.
335  * If there's exactly one entry, we need only remove and signal that
336  * entry. Otherwise, remove the first entry and insert our sentinel.
337  */
338  SpinLockAcquire(&cv->mutex);
339  /* While we're here, let's assert we're not in the list. */
340  Assert(!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink));
341 
342  if (!proclist_is_empty(&cv->wakeup))
343  {
344  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
345  if (!proclist_is_empty(&cv->wakeup))
346  {
347  proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
348  have_sentinel = true;
349  }
350  }
351  SpinLockRelease(&cv->mutex);
352 
353  /* Awaken first waiter, if there was one. */
354  if (proc != NULL)
355  SetLatch(&proc->procLatch);
356 
357  while (have_sentinel)
358  {
359  /*
360  * Each time through the loop, remove the first wakeup list entry, and
361  * signal it unless it's our sentinel. Repeat as long as the sentinel
362  * remains in the list.
363  *
364  * Notice that if someone else removes our sentinel, we will waken one
365  * additional process before exiting. That's intentional, because if
366  * someone else signals the CV, they may be intending to waken some
367  * third process that added itself to the list after we added the
368  * sentinel. Better to give a spurious wakeup (which should be
369  * harmless beyond wasting some cycles) than to lose a wakeup.
370  */
371  proc = NULL;
372  SpinLockAcquire(&cv->mutex);
373  if (!proclist_is_empty(&cv->wakeup))
374  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
375  have_sentinel = proclist_contains(&cv->wakeup, pgprocno, cvWaitLink);
376  SpinLockRelease(&cv->mutex);
377 
378  if (proc != NULL && proc != MyProc)
379  SetLatch(&proc->procLatch);
380  }
381 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableCancelSleep(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
#define proclist_pop_head_node(list, link_member)
Definition: proclist.h:193
#define Assert(condition)
Definition: c.h:739
static bool proclist_is_empty(proclist_head *list)
Definition: proclist.h:38
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
int pgprocno
Definition: proc.h:110
Definition: proc.h:95
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

◆ ConditionVariableCancelSleep()

void ConditionVariableCancelSleep ( void  )

Definition at line 245 of file condition_variable.c.

References ConditionVariableSignal(), cv_sleep_target, ConditionVariable::mutex, MyProc, PGPROC::pgprocno, proclist_contains, proclist_delete, signaled, SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_heapscan(), _bt_parallel_seize(), AbortSubTransaction(), AbortTransaction(), AuxiliaryProcKill(), BackgroundWriterMain(), BarrierArriveAndWait(), BitmapShouldInitializeSharedState(), CheckpointerMain(), ConditionVariableBroadcast(), ConditionVariablePrepareToSleep(), ProcKill(), ReplicationSlotAcquire(), replorigin_drop(), RequestCheckpoint(), ShutdownAuxiliaryProcess(), WalSndErrorCleanup(), and WalWriterMain().

246 {
248  bool signaled = false;
249 
250  if (cv == NULL)
251  return;
252 
253  SpinLockAcquire(&cv->mutex);
254  if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
255  proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
256  else
257  signaled = true;
258  SpinLockRelease(&cv->mutex);
259 
260  /*
261  * If we've received a signal, pass it on to another waiting process, if
262  * there is one. Otherwise a call to ConditionVariableSignal() might get
263  * lost, despite there being another process ready to handle it.
264  */
265  if (signaled)
267 
268  cv_sleep_target = NULL;
269 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
#define proclist_delete(list, procno, link_member)
Definition: proclist.h:187
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableSignal(ConditionVariable *cv)
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
int pgprocno
Definition: proc.h:110
static volatile sig_atomic_t signaled
Definition: pg_standby.c:52
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

◆ ConditionVariableInit()

void ConditionVariableInit ( ConditionVariable cv)

◆ ConditionVariablePrepareToSleep()

void ConditionVariablePrepareToSleep ( ConditionVariable cv)

Definition at line 61 of file condition_variable.c.

References AddWaitEventToSet(), ConditionVariableCancelSleep(), CreateWaitEventSet(), ConditionVariable::mutex, MyLatch, MyProc, PGINVALID_SOCKET, PGPROC::pgprocno, proclist_push_tail, ResetLatch(), SpinLockAcquire, SpinLockRelease, TopMemoryContext, ConditionVariable::wakeup, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by BarrierArriveAndWait(), ConditionVariableTimedSleep(), ReplicationSlotAcquire(), and RequestCheckpoint().

62 {
63  int pgprocno = MyProc->pgprocno;
64 
65  /*
66  * If first time through in this process, create a WaitEventSet, which
67  * we'll reuse for all condition variable sleeps.
68  */
69  if (cv_wait_event_set == NULL)
70  {
71  WaitEventSet *new_event_set;
72 
73  new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
75  MyLatch, NULL);
77  NULL, NULL);
78  /* Don't set cv_wait_event_set until we have a correct WES. */
79  cv_wait_event_set = new_event_set;
80  }
81 
82  /*
83  * If some other sleep is already prepared, cancel it; this is necessary
84  * because we have just one static variable tracking the prepared sleep,
85  * and also only one cvWaitLink in our PGPROC. It's okay to do this
86  * because whenever control does return to the other test-and-sleep loop,
87  * its ConditionVariableSleep call will just re-establish that sleep as
88  * the prepared one.
89  */
90  if (cv_sleep_target != NULL)
92 
93  /* Record the condition variable on which we will sleep. */
94  cv_sleep_target = cv;
95 
96  /*
97  * Reset my latch before adding myself to the queue, to ensure that we
98  * don't miss a wakeup that occurs immediately.
99  */
101 
102  /* Add myself to the wait queue. */
103  SpinLockAcquire(&cv->mutex);
104  proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
105  SpinLockRelease(&cv->mutex);
106 }
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:690
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
static WaitEventSet * cv_wait_event_set
void ResetLatch(Latch *latch)
Definition: latch.c:519
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:542
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableCancelSleep(void)
MemoryContext TopMemoryContext
Definition: mcxt.c:44
#define SpinLockRelease(lock)
Definition: spin.h:64
#define PGINVALID_SOCKET
Definition: port.h:33
static ConditionVariable * cv_sleep_target
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
int pgprocno
Definition: proc.h:110
struct Latch * MyLatch
Definition: globals.c:54
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ ConditionVariableSignal()

void ConditionVariableSignal ( ConditionVariable cv)

Definition at line 280 of file condition_variable.c.

References ConditionVariable::mutex, PGPROC::procLatch, proclist_is_empty(), proclist_pop_head_node, SetLatch(), SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_release(), _bt_parallel_scan_and_sort(), and ConditionVariableCancelSleep().

281 {
282  PGPROC *proc = NULL;
283 
284  /* Remove the first process from the wakeup queue (if any). */
285  SpinLockAcquire(&cv->mutex);
286  if (!proclist_is_empty(&cv->wakeup))
287  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
288  SpinLockRelease(&cv->mutex);
289 
290  /* If we found someone sleeping, set their latch to wake them up. */
291  if (proc != NULL)
292  SetLatch(&proc->procLatch);
293 }
proclist_head wakeup
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define proclist_pop_head_node(list, link_member)
Definition: proclist.h:193
static bool proclist_is_empty(proclist_head *list)
Definition: proclist.h:38
Definition: proc.h:95

◆ ConditionVariableSleep()

void ConditionVariableSleep ( ConditionVariable cv,
uint32  wait_event_info 
)

Definition at line 124 of file condition_variable.c.

References ConditionVariableTimedSleep().

Referenced by _bt_parallel_heapscan(), _bt_parallel_seize(), BarrierArriveAndWait(), BitmapShouldInitializeSharedState(), ReplicationSlotAcquire(), replorigin_drop(), and RequestCheckpoint().

125 {
126  (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
127  wait_event_info);
128 }
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)

◆ ConditionVariableTimedSleep()

bool ConditionVariableTimedSleep ( ConditionVariable cv,
long  timeout,
uint32  wait_event_info 
)

Definition at line 138 of file condition_variable.c.

References Assert, CHECK_FOR_INTERRUPTS, ConditionVariablePrepareToSleep(), INSTR_TIME_GET_MILLISEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SUBTRACT, ConditionVariable::mutex, MyLatch, MyProc, PGPROC::pgprocno, proclist_contains, proclist_push_tail, ResetLatch(), SpinLockAcquire, SpinLockRelease, start_time, WaitEventSetWait(), and ConditionVariable::wakeup.

Referenced by ConditionVariableSleep().

140 {
141  long cur_timeout = -1;
143  instr_time cur_time;
144 
145  /*
146  * If the caller didn't prepare to sleep explicitly, then do so now and
147  * return immediately. The caller's predicate loop should immediately
148  * call again if its exit condition is not yet met. This will result in
149  * the exit condition being tested twice before we first sleep. The extra
150  * test can be prevented by calling ConditionVariablePrepareToSleep(cv)
151  * first. Whether it's worth doing that depends on whether you expect the
152  * exit condition to be met initially, in which case skipping the prepare
153  * is recommended because it avoids manipulations of the wait list, or not
154  * met initially, in which case preparing first is better because it
155  * avoids one extra test of the exit condition.
156  *
157  * If we are currently prepared to sleep on some other CV, we just cancel
158  * that and prepare this one; see ConditionVariablePrepareToSleep.
159  */
160  if (cv_sleep_target != cv)
161  {
163  return false;
164  }
165 
166  /*
167  * Record the current time so that we can calculate the remaining timeout
168  * if we are woken up spuriously.
169  */
170  if (timeout >= 0)
171  {
172  INSTR_TIME_SET_CURRENT(start_time);
173  Assert(timeout >= 0 && timeout <= INT_MAX);
174  cur_timeout = timeout;
175  }
176 
177  while (true)
178  {
179  WaitEvent event;
180  bool done = false;
181 
182  /*
183  * Wait for latch to be set. (If we're awakened for some other
184  * reason, the code below will cope anyway.)
185  */
186  (void) WaitEventSetWait(cv_wait_event_set, cur_timeout, &event, 1,
187  wait_event_info);
188 
189  /* Reset latch before examining the state of the wait list. */
191 
193 
194  /*
195  * If this process has been taken out of the wait list, then we know
196  * that it has been signaled by ConditionVariableSignal (or
197  * ConditionVariableBroadcast), so we should return to the caller. But
198  * that doesn't guarantee that the exit condition is met, only that we
199  * ought to check it. So we must put the process back into the wait
200  * list, to ensure we don't miss any additional wakeup occurring while
201  * the caller checks its exit condition. We can take ourselves out of
202  * the wait list only when the caller calls
203  * ConditionVariableCancelSleep.
204  *
205  * If we're still in the wait list, then the latch must have been set
206  * by something other than ConditionVariableSignal; though we don't
207  * guarantee not to return spuriously, we'll avoid this obvious case.
208  */
209  SpinLockAcquire(&cv->mutex);
210  if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
211  {
212  done = true;
213  proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
214  }
215  SpinLockRelease(&cv->mutex);
216 
217  /* We were signaled, so return */
218  if (done)
219  return false;
220 
221  /* If we're not done, update cur_timeout for next iteration */
222  if (timeout >= 0)
223  {
224  INSTR_TIME_SET_CURRENT(cur_time);
225  INSTR_TIME_SUBTRACT(cur_time, start_time);
226  cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
227 
228  /* Have we crossed the timeout threshold? */
229  if (cur_timeout <= 0)
230  return true;
231  }
232  }
233 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
#define INSTR_TIME_GET_MILLISEC(t)
Definition: instr_time.h:202
struct timeval instr_time
Definition: instr_time.h:150
static WaitEventSet * cv_wait_event_set
void ResetLatch(Latch *latch)
Definition: latch.c:519
static time_t start_time
Definition: pg_ctl.c:99
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:170
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
#define Assert(condition)
Definition: c.h:739
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:156
int pgprocno
Definition: proc.h:110
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:953
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

Variable Documentation

◆ cv_sleep_target

ConditionVariable* cv_sleep_target = NULL
static

Definition at line 31 of file condition_variable.c.

Referenced by ConditionVariableCancelSleep().

◆ cv_wait_event_set

WaitEventSet* cv_wait_event_set = NULL
static

Definition at line 34 of file condition_variable.c.