PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
barrier.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * barrier.c
4 * Barriers for synchronizing cooperating processes.
5 *
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * From Wikipedia[1]: "In parallel computing, a barrier is a type of
10 * synchronization method. A barrier for a group of threads or processes in
11 * the source code means any thread/process must stop at this point and cannot
12 * proceed until all other threads/processes reach this barrier."
13 *
14 * This implementation of barriers allows for static sets of participants
15 * known up front, or dynamic sets of participants which processes can join or
16 * leave at any time. In the dynamic case, a phase number can be used to
17 * track progress through a parallel algorithm, and may be necessary to
18 * synchronize with the current phase of a multi-phase algorithm when a new
19 * participant joins. In the static case, the phase number is used
20 * internally, but it isn't strictly necessary for client code to access it
21 * because the phase can only advance when the declared number of participants
22 * reaches the barrier, so client code should be in no doubt about the current
23 * phase of computation at all times.
24 *
25 * Consider a parallel algorithm that involves separate phases of computation
26 * A, B and C where the output of each phase is needed before the next phase
27 * can begin.
28 *
29 * In the case of a static barrier initialized with 4 participants, each
30 * participant works on phase A, then calls BarrierArriveAndWait to wait until
31 * all 4 participants have reached that point. When BarrierArriveAndWait
32 * returns control, each participant can work on B, and so on. Because the
33 * barrier knows how many participants to expect, the phases of computation
34 * don't need labels or numbers, since each process's program counter implies
35 * the current phase. Even if some of the processes are slow to start up and
36 * begin running phase A, the other participants are expecting them and will
37 * patiently wait at the barrier. The code could be written as follows:
38 *
39 * perform_a();
40 * BarrierArriveAndWait(&barrier, ...);
41 * perform_b();
42 * BarrierArriveAndWait(&barrier, ...);
43 * perform_c();
44 * BarrierArriveAndWait(&barrier, ...);
45 *
46 * If the number of participants is not known up front, then a dynamic barrier
47 * is needed and the number should be set to zero at initialization. New
48 * complications arise because the number necessarily changes over time as
49 * participants attach and detach, and therefore phases B, C or even the end
50 * of processing may be reached before any given participant has started
51 * running and attached. Therefore the client code must perform an initial
52 * test of the phase number after attaching, because it needs to find out
53 * which phase of the algorithm has been reached by any participants that are
54 * already attached in order to synchronize with that work. Once the program
55 * counter or some other representation of current progress is synchronized
56 * with the barrier's phase, normal control flow can be used just as in the
57 * static case. Our example could be written using a switch statement with
58 * cases that fall-through, as follows:
59 *
60 * phase = BarrierAttach(&barrier);
61 * switch (phase)
62 * {
63 * case PHASE_A:
64 * perform_a();
65 * BarrierArriveAndWait(&barrier, ...);
66 * case PHASE_B:
67 * perform_b();
68 * BarrierArriveAndWait(&barrier, ...);
69 * case PHASE_C:
70 * perform_c();
71 * BarrierArriveAndWait(&barrier, ...);
72 * }
73 * BarrierDetach(&barrier);
74 *
75 * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic
76 * barriers behave similarly to Java's java.util.concurrent.Phaser.
77 *
78 * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
79 *
80 * IDENTIFICATION
81 * src/backend/storage/ipc/barrier.c
82 *
83 *-------------------------------------------------------------------------
84 */
85
86#include "postgres.h"
87#include "storage/barrier.h"
88
89static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
90
91/*
92 * Initialize this barrier. To use a static party size, provide the number of
93 * participants to wait for at each phase indicating that that number of
94 * backends is implicitly attached. To use a dynamic party size, specify zero
95 * here and then use BarrierAttach() and
96 * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
97 * participants explicitly.
98 */
99void
100BarrierInit(Barrier *barrier, int participants)
101{
102 SpinLockInit(&barrier->mutex);
103 barrier->participants = participants;
104 barrier->arrived = 0;
105 barrier->phase = 0;
106 barrier->elected = 0;
107 barrier->static_party = participants > 0;
108 ConditionVariableInit(&barrier->condition_variable);
109}
110
111/*
112 * Arrive at this barrier, wait for all other attached participants to arrive
113 * too and then return. Increments the current phase. The caller must be
114 * attached.
115 *
116 * While waiting, pg_stat_activity shows a wait_event_type and wait_event
117 * controlled by the wait_event_info passed in, which should be a value from
118 * one of the WaitEventXXX enums defined in pgstat.h.
119 *
120 * Return true in one arbitrarily chosen participant. Return false in all
121 * others. The return code can be used to elect one participant to execute a
122 * phase of work that must be done serially while other participants wait.
123 */
124bool
126{
127 bool release = false;
128 bool elected;
129 int start_phase;
130 int next_phase;
131
132 SpinLockAcquire(&barrier->mutex);
133 start_phase = barrier->phase;
134 next_phase = start_phase + 1;
135 ++barrier->arrived;
136 if (barrier->arrived == barrier->participants)
137 {
138 release = true;
139 barrier->arrived = 0;
140 barrier->phase = next_phase;
141 barrier->elected = next_phase;
142 }
143 SpinLockRelease(&barrier->mutex);
144
145 /*
146 * If we were the last expected participant to arrive, we can release our
147 * peers and return true to indicate that this backend has been elected to
148 * perform any serial work.
149 */
150 if (release)
151 {
152 ConditionVariableBroadcast(&barrier->condition_variable);
153
154 return true;
155 }
156
157 /*
158 * Otherwise we have to wait for the last participant to arrive and
159 * advance the phase.
160 */
161 elected = false;
162 ConditionVariablePrepareToSleep(&barrier->condition_variable);
163 for (;;)
164 {
165 /*
166 * We know that phase must either be start_phase, indicating that we
167 * need to keep waiting, or next_phase, indicating that the last
168 * participant that we were waiting for has either arrived or detached
169 * so that the next phase has begun. The phase cannot advance any
170 * further than that without this backend's participation, because
171 * this backend is attached.
172 */
173 SpinLockAcquire(&barrier->mutex);
174 Assert(barrier->phase == start_phase || barrier->phase == next_phase);
175 release = barrier->phase == next_phase;
176 if (release && barrier->elected != next_phase)
177 {
178 /*
179 * Usually the backend that arrives last and releases the other
180 * backends is elected to return true (see above), so that it can
181 * begin processing serial work while it has a CPU timeslice.
182 * However, if the barrier advanced because someone detached, then
183 * one of the backends that is awoken will need to be elected.
184 */
185 barrier->elected = barrier->phase;
186 elected = true;
187 }
188 SpinLockRelease(&barrier->mutex);
189 if (release)
190 break;
191 ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
192 }
194
195 return elected;
196}
197
198/*
199 * Arrive at this barrier, but detach rather than waiting. Returns true if
200 * the caller was the last to detach.
201 */
202bool
204{
205 return BarrierDetachImpl(barrier, true);
206}
207
208/*
209 * Arrive at a barrier, and detach all but the last to arrive. Returns true if
210 * the caller was the last to arrive, and is therefore still attached.
211 */
212bool
214{
215 SpinLockAcquire(&barrier->mutex);
216 if (barrier->participants > 1)
217 {
218 --barrier->participants;
219 SpinLockRelease(&barrier->mutex);
220
221 return false;
222 }
223 Assert(barrier->participants == 1);
224 ++barrier->phase;
225 SpinLockRelease(&barrier->mutex);
226
227 return true;
228}
229
230/*
231 * Attach to a barrier. All waiting participants will now wait for this
232 * participant to call BarrierArriveAndWait(), BarrierDetach() or
233 * BarrierArriveAndDetach(). Return the current phase.
234 */
235int
237{
238 int phase;
239
240 Assert(!barrier->static_party);
241
242 SpinLockAcquire(&barrier->mutex);
243 ++barrier->participants;
244 phase = barrier->phase;
245 SpinLockRelease(&barrier->mutex);
246
247 return phase;
248}
249
250/*
251 * Detach from a barrier. This may release other waiters from
252 * BarrierArriveAndWait() and advance the phase if they were only waiting for
253 * this backend. Return true if this participant was the last to detach.
254 */
255bool
257{
258 return BarrierDetachImpl(barrier, false);
259}
260
261/*
262 * Return the current phase of a barrier. The caller must be attached.
263 */
264int
266{
267 /*
268 * It is OK to read barrier->phase without locking, because it can't
269 * change without us (we are attached to it), and we executed a memory
270 * barrier when we either attached or participated in changing it last
271 * time.
272 */
273 return barrier->phase;
274}
275
276/*
277 * Return an instantaneous snapshot of the number of participants currently
278 * attached to this barrier. For debugging purposes only.
279 */
280int
282{
283 int participants;
284
285 SpinLockAcquire(&barrier->mutex);
286 participants = barrier->participants;
287 SpinLockRelease(&barrier->mutex);
288
289 return participants;
290}
291
292/*
293 * Detach from a barrier. If 'arrive' is true then also increment the phase
294 * if there are no other participants. If there are other participants
295 * waiting, then the phase will be advanced and they'll be released if they
296 * were only waiting for the caller. Return true if this participant was the
297 * last to detach.
298 */
299static inline bool
301{
302 bool release;
303 bool last;
304
305 Assert(!barrier->static_party);
306
307 SpinLockAcquire(&barrier->mutex);
308 Assert(barrier->participants > 0);
309 --barrier->participants;
310
311 /*
312 * If any other participants are waiting and we were the last participant
313 * waited for, release them. If no other participants are waiting, but
314 * this is a BarrierArriveAndDetach() call, then advance the phase too.
315 */
316 if ((arrive || barrier->participants > 0) &&
317 barrier->arrived == barrier->participants)
318 {
319 release = true;
320 barrier->arrived = 0;
321 ++barrier->phase;
322 }
323 else
324 release = false;
325
326 last = barrier->participants == 0;
327 SpinLockRelease(&barrier->mutex);
328
329 if (release)
330 ConditionVariableBroadcast(&barrier->condition_variable);
331
332 return last;
333}
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
Definition: barrier.c:213
int BarrierParticipants(Barrier *barrier)
Definition: barrier.c:281
bool BarrierArriveAndDetach(Barrier *barrier)
Definition: barrier.c:203
static bool BarrierDetachImpl(Barrier *barrier, bool arrive)
Definition: barrier.c:300
int BarrierAttach(Barrier *barrier)
Definition: barrier.c:236
void BarrierInit(Barrier *barrier, int participants)
Definition: barrier.c:100
int BarrierPhase(Barrier *barrier)
Definition: barrier.c:265
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
Definition: barrier.c:125
bool BarrierDetach(Barrier *barrier)
Definition: barrier.c:256
#define Assert(condition)
Definition: c.h:812
uint32_t uint32
Definition: c.h:485
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
static THREAD_BARRIER_T barrier
Definition: pgbench.c:480
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59