PostgreSQL Source Code  git master
condition_variable.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "portability/instr_time.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/proclist.h"
#include "storage/spin.h"
#include "utils/memutils.h"
Include dependency graph for condition_variable.c:

Go to the source code of this file.

Functions

void ConditionVariableInit (ConditionVariable *cv)
 
void ConditionVariablePrepareToSleep (ConditionVariable *cv)
 
void ConditionVariableSleep (ConditionVariable *cv, uint32 wait_event_info)
 
bool ConditionVariableTimedSleep (ConditionVariable *cv, long timeout, uint32 wait_event_info)
 
void ConditionVariableCancelSleep (void)
 
void ConditionVariableSignal (ConditionVariable *cv)
 
void ConditionVariableBroadcast (ConditionVariable *cv)
 

Variables

static ConditionVariablecv_sleep_target = NULL
 
static WaitEventSetcv_wait_event_set = NULL
 

Function Documentation

◆ ConditionVariableBroadcast()

void ConditionVariableBroadcast ( ConditionVariable cv)

Definition at line 297 of file condition_variable.c.

References Assert, ConditionVariableCancelSleep(), ConditionVariable::mutex, MyProc, PGPROC::pgprocno, PGPROC::procLatch, proclist_contains, proclist_is_empty(), proclist_pop_head_node, proclist_push_tail, SetLatch(), SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_done(), BarrierArriveAndWait(), BarrierDetachImpl(), BitmapDoneInitializingSharedState(), CheckpointerMain(), ReplicationOriginExitCleanup(), ReplicationSlotAcquire(), ReplicationSlotCleanup(), ReplicationSlotCreate(), ReplicationSlotDropPtr(), ReplicationSlotRelease(), replorigin_session_reset(), and replorigin_session_setup().

298 {
299  int pgprocno = MyProc->pgprocno;
300  PGPROC *proc = NULL;
301  bool have_sentinel = false;
302 
303  /*
304  * In some use-cases, it is common for awakened processes to immediately
305  * re-queue themselves. If we just naively try to reduce the wakeup list
306  * to empty, we'll get into a potentially-indefinite loop against such a
307  * process. The semantics we really want are just to be sure that we have
308  * wakened all processes that were in the list at entry. We can use our
309  * own cvWaitLink as a sentinel to detect when we've finished.
310  *
311  * A seeming flaw in this approach is that someone else might signal the
312  * CV and in doing so remove our sentinel entry. But that's fine: since
313  * CV waiters are always added and removed in order, that must mean that
314  * every previous waiter has been wakened, so we're done. We'll get an
315  * extra "set" on our latch from the someone else's signal, which is
316  * slightly inefficient but harmless.
317  *
318  * We can't insert our cvWaitLink as a sentinel if it's already in use in
319  * some other proclist. While that's not expected to be true for typical
320  * uses of this function, we can deal with it by simply canceling any
321  * prepared CV sleep. The next call to ConditionVariableSleep will take
322  * care of re-establishing the lost state.
323  */
324  if (cv_sleep_target != NULL)
326 
327  /*
328  * Inspect the state of the queue. If it's empty, we have nothing to do.
329  * If there's exactly one entry, we need only remove and signal that
330  * entry. Otherwise, remove the first entry and insert our sentinel.
331  */
332  SpinLockAcquire(&cv->mutex);
333  /* While we're here, let's assert we're not in the list. */
334  Assert(!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink));
335 
336  if (!proclist_is_empty(&cv->wakeup))
337  {
338  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
339  if (!proclist_is_empty(&cv->wakeup))
340  {
341  proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
342  have_sentinel = true;
343  }
344  }
345  SpinLockRelease(&cv->mutex);
346 
347  /* Awaken first waiter, if there was one. */
348  if (proc != NULL)
349  SetLatch(&proc->procLatch);
350 
351  while (have_sentinel)
352  {
353  /*
354  * Each time through the loop, remove the first wakeup list entry, and
355  * signal it unless it's our sentinel. Repeat as long as the sentinel
356  * remains in the list.
357  *
358  * Notice that if someone else removes our sentinel, we will waken one
359  * additional process before exiting. That's intentional, because if
360  * someone else signals the CV, they may be intending to waken some
361  * third process that added itself to the list after we added the
362  * sentinel. Better to give a spurious wakeup (which should be
363  * harmless beyond wasting some cycles) than to lose a wakeup.
364  */
365  proc = NULL;
366  SpinLockAcquire(&cv->mutex);
367  if (!proclist_is_empty(&cv->wakeup))
368  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
369  have_sentinel = proclist_contains(&cv->wakeup, pgprocno, cvWaitLink);
370  SpinLockRelease(&cv->mutex);
371 
372  if (proc != NULL && proc != MyProc)
373  SetLatch(&proc->procLatch);
374  }
375 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
void SetLatch(Latch *latch)
Definition: latch.c:457
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableCancelSleep(void)
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
#define proclist_pop_head_node(list, link_member)
Definition: proclist.h:193
#define Assert(condition)
Definition: c.h:738
static bool proclist_is_empty(proclist_head *list)
Definition: proclist.h:38
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
int pgprocno
Definition: proc.h:110
Definition: proc.h:95
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

◆ ConditionVariableCancelSleep()

void ConditionVariableCancelSleep ( void  )

Definition at line 239 of file condition_variable.c.

References ConditionVariableSignal(), cv_sleep_target, ConditionVariable::mutex, MyProc, PGPROC::pgprocno, proclist_contains, proclist_delete, signaled, SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_heapscan(), _bt_parallel_seize(), AbortSubTransaction(), AbortTransaction(), AuxiliaryProcKill(), BackgroundWriterMain(), BarrierArriveAndWait(), BitmapShouldInitializeSharedState(), CheckpointerMain(), ConditionVariableBroadcast(), ConditionVariablePrepareToSleep(), InvalidateObsoleteReplicationSlots(), ProcKill(), ReplicationSlotAcquire(), replorigin_drop(), RequestCheckpoint(), ShutdownAuxiliaryProcess(), WalSndErrorCleanup(), and WalWriterMain().

240 {
242  bool signaled = false;
243 
244  if (cv == NULL)
245  return;
246 
247  SpinLockAcquire(&cv->mutex);
248  if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
249  proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
250  else
251  signaled = true;
252  SpinLockRelease(&cv->mutex);
253 
254  /*
255  * If we've received a signal, pass it on to another waiting process, if
256  * there is one. Otherwise a call to ConditionVariableSignal() might get
257  * lost, despite there being another process ready to handle it.
258  */
259  if (signaled)
261 
262  cv_sleep_target = NULL;
263 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
#define proclist_delete(list, procno, link_member)
Definition: proclist.h:187
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableSignal(ConditionVariable *cv)
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
int pgprocno
Definition: proc.h:110
static volatile sig_atomic_t signaled
Definition: pg_standby.c:52
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

◆ ConditionVariableInit()

void ConditionVariableInit ( ConditionVariable cv)

◆ ConditionVariablePrepareToSleep()

void ConditionVariablePrepareToSleep ( ConditionVariable cv)

Definition at line 61 of file condition_variable.c.

References AddWaitEventToSet(), ConditionVariableCancelSleep(), CreateWaitEventSet(), ConditionVariable::mutex, MyLatch, MyProc, PGINVALID_SOCKET, PGPROC::pgprocno, proclist_push_tail, SpinLockAcquire, SpinLockRelease, TopMemoryContext, ConditionVariable::wakeup, WL_EXIT_ON_PM_DEATH, and WL_LATCH_SET.

Referenced by BarrierArriveAndWait(), ConditionVariableTimedSleep(), ReplicationSlotAcquire(), and RequestCheckpoint().

62 {
63  int pgprocno = MyProc->pgprocno;
64 
65  /*
66  * If first time through in this process, create a WaitEventSet, which
67  * we'll reuse for all condition variable sleeps.
68  */
69  if (cv_wait_event_set == NULL)
70  {
71  WaitEventSet *new_event_set;
72 
73  new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
75  MyLatch, NULL);
77  NULL, NULL);
78  /* Don't set cv_wait_event_set until we have a correct WES. */
79  cv_wait_event_set = new_event_set;
80  }
81 
82  /*
83  * If some other sleep is already prepared, cancel it; this is necessary
84  * because we have just one static variable tracking the prepared sleep,
85  * and also only one cvWaitLink in our PGPROC. It's okay to do this
86  * because whenever control does return to the other test-and-sleep loop,
87  * its ConditionVariableSleep call will just re-establish that sleep as
88  * the prepared one.
89  */
90  if (cv_sleep_target != NULL)
92 
93  /* Record the condition variable on which we will sleep. */
94  cv_sleep_target = cv;
95 
96  /* Add myself to the wait queue. */
97  SpinLockAcquire(&cv->mutex);
98  proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
99  SpinLockRelease(&cv->mutex);
100 }
int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
Definition: latch.c:760
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
static WaitEventSet * cv_wait_event_set
WaitEventSet * CreateWaitEventSet(MemoryContext context, int nevents)
Definition: latch.c:563
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ConditionVariableCancelSleep(void)
MemoryContext TopMemoryContext
Definition: mcxt.c:44
#define SpinLockRelease(lock)
Definition: spin.h:64
#define PGINVALID_SOCKET
Definition: port.h:33
static ConditionVariable * cv_sleep_target
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
int pgprocno
Definition: proc.h:110
struct Latch * MyLatch
Definition: globals.c:54
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ ConditionVariableSignal()

void ConditionVariableSignal ( ConditionVariable cv)

Definition at line 274 of file condition_variable.c.

References ConditionVariable::mutex, PGPROC::procLatch, proclist_is_empty(), proclist_pop_head_node, SetLatch(), SpinLockAcquire, SpinLockRelease, and ConditionVariable::wakeup.

Referenced by _bt_parallel_release(), _bt_parallel_scan_and_sort(), and ConditionVariableCancelSleep().

275 {
276  PGPROC *proc = NULL;
277 
278  /* Remove the first process from the wakeup queue (if any). */
279  SpinLockAcquire(&cv->mutex);
280  if (!proclist_is_empty(&cv->wakeup))
281  proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
282  SpinLockRelease(&cv->mutex);
283 
284  /* If we found someone sleeping, set their latch to wake them up. */
285  if (proc != NULL)
286  SetLatch(&proc->procLatch);
287 }
proclist_head wakeup
void SetLatch(Latch *latch)
Definition: latch.c:457
Latch procLatch
Definition: proc.h:104
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define SpinLockRelease(lock)
Definition: spin.h:64
#define proclist_pop_head_node(list, link_member)
Definition: proclist.h:193
static bool proclist_is_empty(proclist_head *list)
Definition: proclist.h:38
Definition: proc.h:95

◆ ConditionVariableSleep()

void ConditionVariableSleep ( ConditionVariable cv,
uint32  wait_event_info 
)

Definition at line 118 of file condition_variable.c.

References ConditionVariableTimedSleep().

Referenced by _bt_parallel_heapscan(), _bt_parallel_seize(), BarrierArriveAndWait(), BitmapShouldInitializeSharedState(), ReplicationSlotAcquire(), replorigin_drop(), and RequestCheckpoint().

119 {
120  (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
121  wait_event_info);
122 }
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)

◆ ConditionVariableTimedSleep()

bool ConditionVariableTimedSleep ( ConditionVariable cv,
long  timeout,
uint32  wait_event_info 
)

Definition at line 132 of file condition_variable.c.

References Assert, CHECK_FOR_INTERRUPTS, ConditionVariablePrepareToSleep(), INSTR_TIME_GET_MILLISEC, INSTR_TIME_SET_CURRENT, INSTR_TIME_SUBTRACT, ConditionVariable::mutex, MyLatch, MyProc, PGPROC::pgprocno, proclist_contains, proclist_push_tail, ResetLatch(), SpinLockAcquire, SpinLockRelease, start_time, WaitEventSetWait(), and ConditionVariable::wakeup.

Referenced by ConditionVariableSleep(), and InvalidateObsoleteReplicationSlots().

134 {
135  long cur_timeout = -1;
137  instr_time cur_time;
138 
139  /*
140  * If the caller didn't prepare to sleep explicitly, then do so now and
141  * return immediately. The caller's predicate loop should immediately
142  * call again if its exit condition is not yet met. This will result in
143  * the exit condition being tested twice before we first sleep. The extra
144  * test can be prevented by calling ConditionVariablePrepareToSleep(cv)
145  * first. Whether it's worth doing that depends on whether you expect the
146  * exit condition to be met initially, in which case skipping the prepare
147  * is recommended because it avoids manipulations of the wait list, or not
148  * met initially, in which case preparing first is better because it
149  * avoids one extra test of the exit condition.
150  *
151  * If we are currently prepared to sleep on some other CV, we just cancel
152  * that and prepare this one; see ConditionVariablePrepareToSleep.
153  */
154  if (cv_sleep_target != cv)
155  {
157  return false;
158  }
159 
160  /*
161  * Record the current time so that we can calculate the remaining timeout
162  * if we are woken up spuriously.
163  */
164  if (timeout >= 0)
165  {
166  INSTR_TIME_SET_CURRENT(start_time);
167  Assert(timeout >= 0 && timeout <= INT_MAX);
168  cur_timeout = timeout;
169  }
170 
171  while (true)
172  {
173  WaitEvent event;
174  bool done = false;
175 
176  /*
177  * Wait for latch to be set. (If we're awakened for some other
178  * reason, the code below will cope anyway.)
179  */
180  (void) WaitEventSetWait(cv_wait_event_set, cur_timeout, &event, 1,
181  wait_event_info);
182 
183  /* Reset latch before examining the state of the wait list. */
185 
187 
188  /*
189  * If this process has been taken out of the wait list, then we know
190  * that it has been signaled by ConditionVariableSignal (or
191  * ConditionVariableBroadcast), so we should return to the caller. But
192  * that doesn't guarantee that the exit condition is met, only that we
193  * ought to check it. So we must put the process back into the wait
194  * list, to ensure we don't miss any additional wakeup occurring while
195  * the caller checks its exit condition. We can take ourselves out of
196  * the wait list only when the caller calls
197  * ConditionVariableCancelSleep.
198  *
199  * If we're still in the wait list, then the latch must have been set
200  * by something other than ConditionVariableSignal; though we don't
201  * guarantee not to return spuriously, we'll avoid this obvious case.
202  */
203  SpinLockAcquire(&cv->mutex);
204  if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
205  {
206  done = true;
207  proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
208  }
209  SpinLockRelease(&cv->mutex);
210 
211  /* We were signaled, so return */
212  if (done)
213  return false;
214 
215  /* If we're not done, update cur_timeout for next iteration */
216  if (timeout >= 0)
217  {
218  INSTR_TIME_SET_CURRENT(cur_time);
219  INSTR_TIME_SUBTRACT(cur_time, start_time);
220  cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
221 
222  /* Have we crossed the timeout threshold? */
223  if (cur_timeout <= 0)
224  return true;
225  }
226  }
227 }
proclist_head wakeup
PGPROC * MyProc
Definition: proc.c:67
#define INSTR_TIME_GET_MILLISEC(t)
Definition: instr_time.h:202
struct timeval instr_time
Definition: instr_time.h:150
static WaitEventSet * cv_wait_event_set
void ResetLatch(Latch *latch)
Definition: latch.c:540
static time_t start_time
Definition: pg_ctl.c:99
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:170
#define SpinLockRelease(lock)
Definition: spin.h:64
static ConditionVariable * cv_sleep_target
#define Assert(condition)
Definition: c.h:738
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:191
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:156
int pgprocno
Definition: proc.h:110
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1167
#define proclist_contains(list, procno, link_member)
Definition: proclist.h:195

Variable Documentation

◆ cv_sleep_target

ConditionVariable* cv_sleep_target = NULL
static

Definition at line 31 of file condition_variable.c.

Referenced by ConditionVariableCancelSleep().

◆ cv_wait_event_set

WaitEventSet* cv_wait_event_set = NULL
static

Definition at line 34 of file condition_variable.c.