PostgreSQL Source Code  git master
barrier.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * barrier.c
4  * Barriers for synchronizing cooperating processes.
5  *
6  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * From Wikipedia[1]: "In parallel computing, a barrier is a type of
10  * synchronization method. A barrier for a group of threads or processes in
11  * the source code means any thread/process must stop at this point and cannot
12  * proceed until all other threads/processes reach this barrier."
13  *
14  * This implementation of barriers allows for static sets of participants
15  * known up front, or dynamic sets of participants which processes can join or
16  * leave at any time. In the dynamic case, a phase number can be used to
17  * track progress through a parallel algorithm, and may be necessary to
18  * synchronize with the current phase of a multi-phase algorithm when a new
19  * participant joins. In the static case, the phase number is used
20  * internally, but it isn't strictly necessary for client code to access it
21  * because the phase can only advance when the declared number of participants
22  * reaches the barrier, so client code should be in no doubt about the current
23  * phase of computation at all times.
24  *
25  * Consider a parallel algorithm that involves separate phases of computation
26  * A, B and C where the output of each phase is needed before the next phase
27  * can begin.
28  *
29  * In the case of a static barrier initialized with 4 participants, each
30  * participant works on phase A, then calls BarrierArriveAndWait to wait until
31  * all 4 participants have reached that point. When BarrierArriveAndWait
32  * returns control, each participant can work on B, and so on. Because the
33  * barrier knows how many participants to expect, the phases of computation
34  * don't need labels or numbers, since each process's program counter implies
35  * the current phase. Even if some of the processes are slow to start up and
36  * begin running phase A, the other participants are expecting them and will
37  * patiently wait at the barrier. The code could be written as follows:
38  *
39  * perform_a();
40  * BarrierArriveAndWait(&barrier, ...);
41  * perform_b();
42  * BarrierArriveAndWait(&barrier, ...);
43  * perform_c();
44  * BarrierArriveAndWait(&barrier, ...);
45  *
46  * If the number of participants is not known up front, then a dynamic barrier
47  * is needed and the number should be set to zero at initialization. New
48  * complications arise because the number necessarily changes over time as
49  * participants attach and detach, and therefore phases B, C or even the end
50  * of processing may be reached before any given participant has started
51  * running and attached. Therefore the client code must perform an initial
52  * test of the phase number after attaching, because it needs to find out
53  * which phase of the algorithm has been reached by any participants that are
54  * already attached in order to synchronize with that work. Once the program
55  * counter or some other representation of current progress is synchronized
56  * with the barrier's phase, normal control flow can be used just as in the
57  * static case. Our example could be written using a switch statement with
58  * cases that fall-through, as follows:
59  *
60  * phase = BarrierAttach(&barrier);
61  * switch (phase)
62  * {
63  * case PHASE_A:
64  * perform_a();
65  * BarrierArriveAndWait(&barrier, ...);
66  * case PHASE_B:
67  * perform_b();
68  * BarrierArriveAndWait(&barrier, ...);
69  * case PHASE_C:
70  * perform_c();
71  * BarrierArriveAndWait(&barrier, ...);
72  * }
73  * BarrierDetach(&barrier);
74  *
75  * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic
76  * barriers behave similarly to Java's java.util.concurrent.Phaser.
77  *
78  * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
79  *
80  * IDENTIFICATION
81  * src/backend/storage/ipc/barrier.c
82  *
83  *-------------------------------------------------------------------------
84  */
85 
86 #include "postgres.h"
87 #include "storage/barrier.h"
88 
89 static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
90 
91 /*
92  * Initialize this barrier. To use a static party size, provide the number of
93  * participants to wait for at each phase indicating that that number of
94  * backends is implicitly attached. To use a dynamic party size, specify zero
95  * here and then use BarrierAttach() and
96  * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
97  * participants explicitly.
98  */
99 void
100 BarrierInit(Barrier *barrier, int participants)
101 {
102  SpinLockInit(&barrier->mutex);
103  barrier->participants = participants;
104  barrier->arrived = 0;
105  barrier->phase = 0;
106  barrier->elected = 0;
107  barrier->static_party = participants > 0;
108  ConditionVariableInit(&barrier->condition_variable);
109 }
110 
111 /*
112  * Arrive at this barrier, wait for all other attached participants to arrive
113  * too and then return. Increments the current phase. The caller must be
114  * attached.
115  *
116  * While waiting, pg_stat_activity shows a wait_event_type and wait_event
117  * controlled by the wait_event_info passed in, which should be a value from
118  * one of the WaitEventXXX enums defined in pgstat.h.
119  *
120  * Return true in one arbitrarily chosen participant. Return false in all
121  * others. The return code can be used to elect one participant to execute a
122  * phase of work that must be done serially while other participants wait.
123  */
124 bool
126 {
127  bool release = false;
128  bool elected;
129  int start_phase;
130  int next_phase;
131 
132  SpinLockAcquire(&barrier->mutex);
133  start_phase = barrier->phase;
134  next_phase = start_phase + 1;
135  ++barrier->arrived;
136  if (barrier->arrived == barrier->participants)
137  {
138  release = true;
139  barrier->arrived = 0;
140  barrier->phase = next_phase;
141  barrier->elected = next_phase;
142  }
143  SpinLockRelease(&barrier->mutex);
144 
145  /*
146  * If we were the last expected participant to arrive, we can release our
147  * peers and return true to indicate that this backend has been elected to
148  * perform any serial work.
149  */
150  if (release)
151  {
152  ConditionVariableBroadcast(&barrier->condition_variable);
153 
154  return true;
155  }
156 
157  /*
158  * Otherwise we have to wait for the last participant to arrive and
159  * advance the phase.
160  */
161  elected = false;
162  ConditionVariablePrepareToSleep(&barrier->condition_variable);
163  for (;;)
164  {
165  /*
166  * We know that phase must either be start_phase, indicating that we
167  * need to keep waiting, or next_phase, indicating that the last
168  * participant that we were waiting for has either arrived or detached
169  * so that the next phase has begun. The phase cannot advance any
170  * further than that without this backend's participation, because
171  * this backend is attached.
172  */
173  SpinLockAcquire(&barrier->mutex);
174  Assert(barrier->phase == start_phase || barrier->phase == next_phase);
175  release = barrier->phase == next_phase;
176  if (release && barrier->elected != next_phase)
177  {
178  /*
179  * Usually the backend that arrives last and releases the other
180  * backends is elected to return true (see above), so that it can
181  * begin processing serial work while it has a CPU timeslice.
182  * However, if the barrier advanced because someone detached, then
183  * one of the backends that is awoken will need to be elected.
184  */
185  barrier->elected = barrier->phase;
186  elected = true;
187  }
188  SpinLockRelease(&barrier->mutex);
189  if (release)
190  break;
191  ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
192  }
194 
195  return elected;
196 }
197 
198 /*
199  * Arrive at this barrier, but detach rather than waiting. Returns true if
200  * the caller was the last to detach.
201  */
202 bool
204 {
205  return BarrierDetachImpl(barrier, true);
206 }
207 
208 /*
209  * Arrive at a barrier, and detach all but the last to arrive. Returns true if
210  * the caller was the last to arrive, and is therefore still attached.
211  */
212 bool
214 {
215  SpinLockAcquire(&barrier->mutex);
216  if (barrier->participants > 1)
217  {
218  --barrier->participants;
219  SpinLockRelease(&barrier->mutex);
220 
221  return false;
222  }
223  Assert(barrier->participants == 1);
224  ++barrier->phase;
225  SpinLockRelease(&barrier->mutex);
226 
227  return true;
228 }
229 
230 /*
231  * Attach to a barrier. All waiting participants will now wait for this
232  * participant to call BarrierArriveAndWait(), BarrierDetach() or
233  * BarrierArriveAndDetach(). Return the current phase.
234  */
235 int
237 {
238  int phase;
239 
240  Assert(!barrier->static_party);
241 
242  SpinLockAcquire(&barrier->mutex);
243  ++barrier->participants;
244  phase = barrier->phase;
245  SpinLockRelease(&barrier->mutex);
246 
247  return phase;
248 }
249 
250 /*
251  * Detach from a barrier. This may release other waiters from
252  * BarrierArriveAndWait() and advance the phase if they were only waiting for
253  * this backend. Return true if this participant was the last to detach.
254  */
255 bool
257 {
258  return BarrierDetachImpl(barrier, false);
259 }
260 
261 /*
262  * Return the current phase of a barrier. The caller must be attached.
263  */
264 int
266 {
267  /*
268  * It is OK to read barrier->phase without locking, because it can't
269  * change without us (we are attached to it), and we executed a memory
270  * barrier when we either attached or participated in changing it last
271  * time.
272  */
273  return barrier->phase;
274 }
275 
276 /*
277  * Return an instantaneous snapshot of the number of participants currently
278  * attached to this barrier. For debugging purposes only.
279  */
280 int
282 {
283  int participants;
284 
285  SpinLockAcquire(&barrier->mutex);
286  participants = barrier->participants;
287  SpinLockRelease(&barrier->mutex);
288 
289  return participants;
290 }
291 
292 /*
293  * Detach from a barrier. If 'arrive' is true then also increment the phase
294  * if there are no other participants. If there are other participants
295  * waiting, then the phase will be advanced and they'll be released if they
296  * were only waiting for the caller. Return true if this participant was the
297  * last to detach.
298  */
299 static inline bool
301 {
302  bool release;
303  bool last;
304 
305  Assert(!barrier->static_party);
306 
307  SpinLockAcquire(&barrier->mutex);
308  Assert(barrier->participants > 0);
309  --barrier->participants;
310 
311  /*
312  * If any other participants are waiting and we were the last participant
313  * waited for, release them. If no other participants are waiting, but
314  * this is a BarrierArriveAndDetach() call, then advance the phase too.
315  */
316  if ((arrive || barrier->participants > 0) &&
317  barrier->arrived == barrier->participants)
318  {
319  release = true;
320  barrier->arrived = 0;
321  ++barrier->phase;
322  }
323  else
324  release = false;
325 
326  last = barrier->participants == 0;
327  SpinLockRelease(&barrier->mutex);
328 
329  if (release)
330  ConditionVariableBroadcast(&barrier->condition_variable);
331 
332  return last;
333 }
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
int BarrierParticipants(Barrier *barrier)
Definition: barrier.c:281
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
static bool BarrierDetachImpl(Barrier *barrier, bool arrive)
Definition: barrier.c:300
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:256
unsigned int uint32
Definition: c.h:441
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
Assert(fmt[strlen(fmt) - 1] !='\n')
static THREAD_BARRIER_T barrier
Definition: pgbench.c:492
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62