PostgreSQL Source Code git master
Loading...
Searching...
No Matches
syncrep.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support a priority-based multiple synchronous
34 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 * supported. The number of synchronous standbys that transactions
36 * must wait for replies from is specified in synchronous_standby_names.
37 * This parameter also specifies a list of standby names and the method
38 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 *
40 * The method FIRST specifies a priority-based synchronous replication
41 * and makes transaction commits wait until their WAL records are
42 * replicated to the requested number of synchronous standbys chosen based
43 * on their priorities. The standbys whose names appear earlier in the list
44 * are given higher priority and will be considered as synchronous.
45 * Other standby servers appearing later in this list represent potential
46 * synchronous standbys. If any of the current synchronous standbys
47 * disconnects for whatever reason, it will be replaced immediately with
48 * the next-highest-priority standby.
49 *
50 * The method ANY specifies a quorum-based synchronous replication
51 * and makes transaction commits wait until their WAL records are
52 * replicated to at least the requested number of synchronous standbys
53 * in the list. All the standbys appearing in the list are considered as
54 * candidates for quorum synchronous standbys.
55 *
56 * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 * This is for backward compatibility with 9.6 or before where only a
58 * priority-based sync replication was supported.
59 *
60 * Before the standbys chosen from synchronous_standby_names can
61 * become the synchronous standbys they must have caught up with
62 * the primary; that may take some time. Once caught up,
63 * the standbys which are considered as synchronous at that moment
64 * will release waiters from the queue.
65 *
66 * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
67 *
68 * IDENTIFICATION
69 * src/backend/replication/syncrep.c
70 *
71 *-------------------------------------------------------------------------
72 */
73#include "postgres.h"
74
75#include <unistd.h>
76
77#include "access/xact.h"
78#include "common/int.h"
79#include "miscadmin.h"
80#include "pgstat.h"
81#include "replication/syncrep.h"
84#include "storage/proc.h"
85#include "tcop/tcopprot.h"
86#include "utils/guc_hooks.h"
87#include "utils/ps_status.h"
88#include "utils/wait_event.h"
89
90/* User-settable parameters for sync rep */
92
93#define SyncStandbysDefined() \
94 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95
96static bool announce_next_takeover = true;
97
100
101static void SyncRepQueueInsert(int mode);
102static void SyncRepCancelWait(void);
103static int SyncRepWakeQueue(bool all, int mode);
104
108 bool *am_sync);
113 int num_standbys);
118 int num_standbys,
119 uint8 nth);
120static int SyncRepGetStandbyPriority(void);
121static int standby_priority_comparator(const void *a, const void *b);
122static int cmp_lsn(const void *a, const void *b);
123
124#ifdef USE_ASSERT_CHECKING
125static bool SyncRepQueueIsOrderedByLSN(int mode);
126#endif
127
128/*
129 * ===========================================================
130 * Synchronous Replication functions for normal user backends
131 * ===========================================================
132 */
133
134/*
135 * Wait for synchronous replication, if requested by user.
136 *
137 * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 * change that state to SYNC_REP_WAITING before adding ourselves
139 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 *
143 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 * represents a commit record. If it doesn't, then we wait only for the WAL
145 * to be flushed if synchronous_commit is set to the higher level of
146 * remote_apply, because only commit records provide apply feedback.
147 */
148void
150{
151 int mode;
152
153 /*
154 * This should be called while holding interrupts during a transaction
155 * commit to prevent the follow-up shared memory queue cleanups to be
156 * influenced by external interruptions.
157 */
159
160 /*
161 * Fast exit if user has not requested sync replication, or there are no
162 * sync replication standby names defined.
163 *
164 * Since this routine gets called every commit time, it's important to
165 * exit quickly if sync replication is not requested.
166 *
167 * We check WalSndCtl->sync_standbys_status flag without the lock and exit
168 * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
169 * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
170 * replication requested).
171 *
172 * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
173 * while holding the lock, to check the flag and operate the sync rep
174 * queue atomically. This is necessary to avoid the race condition
175 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
176 * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
177 * don't touch the queue.
178 */
179 if (!SyncRepRequested() ||
180 ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
182 return;
183
184 /* Cap the level for anything other than commit to remote flush only. */
185 if (commit)
187 else
189
192
195
196 /*
197 * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
198 * SyncRepUpdateSyncStandbysDefined().
199 *
200 * Also check that the standby hasn't already replied. Unlikely race
201 * condition but we'll be fetching that cache line anyway so it's likely
202 * to be a low cost check.
203 *
204 * If the sync standby data has not been initialized yet
205 * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
206 * then do a direct GUC check.
207 */
209 {
212 {
214 return;
215 }
216 }
217 else if (lsn <= WalSndCtl->lsn[mode])
218 {
219 /*
220 * The LSN is older than what we need to wait for. The sync standby
221 * data has not been initialized yet, but we are OK to not wait
222 * because we know that there is no point in doing so based on the
223 * LSN.
224 */
226 return;
227 }
228 else if (!SyncStandbysDefined())
229 {
230 /*
231 * If we are here, the sync standby data has not been initialized yet,
232 * and the LSN is newer than what need to wait for, so we have fallen
233 * back to the best thing we could do in this case: a check on
234 * SyncStandbysDefined() to see if the GUC is set or not.
235 *
236 * When the GUC has a value, we wait until the checkpointer updates
237 * the status data because we cannot be sure yet if we should wait or
238 * not. Here, the GUC has *no* value, we are sure that there is no
239 * point to wait; this matters for example when initializing a
240 * cluster, where we should never wait, and no sync standbys is the
241 * default behavior.
242 */
244 return;
245 }
246
247 /*
248 * Set our waitLSN so WALSender will know when to wake us, and add
249 * ourselves to the queue.
250 */
251 MyProc->waitLSN = lsn;
256
257 /* Alter ps display to show waiting for sync rep. */
259 {
260 char buffer[32];
261
262 sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
263 set_ps_display_suffix(buffer);
264 }
265
266 /*
267 * Wait for specified LSN to be confirmed.
268 *
269 * Each proc has its own wait latch, so we perform a normal latch
270 * check/wait loop here.
271 */
272 for (;;)
273 {
274 int rc;
275
276 /* Must reset the latch before testing state. */
278
279 /*
280 * Acquiring the lock is not needed, the latch ensures proper
281 * barriers. If it looks like we're done, we must really be done,
282 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
283 * it will never update it again, so we can't be seeing a stale value
284 * in that case.
285 */
287 break;
288
289 /*
290 * If a wait for synchronous replication is pending, we can neither
291 * acknowledge the commit nor raise ERROR or FATAL. The latter would
292 * lead the client to believe that the transaction aborted, which is
293 * not true: it's already committed locally. The former is no good
294 * either: the client has requested synchronous replication, and is
295 * entitled to assume that an acknowledged commit is also replicated,
296 * which might not be true. So in this case we issue a WARNING (which
297 * some clients may be able to interpret) and shut off further output.
298 * We do NOT reset ProcDiePending, so that the process will die after
299 * the commit is cleaned up.
300 */
301 if (ProcDiePending)
302 {
303 if (ProcDieSenderPid != 0)
306 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
307 errdetail("The transaction has already committed locally, but might not have been replicated to the standby."),
308 errdetail_log("The transaction has already committed locally, but might not have been replicated to the standby. Signal sent by PID %d, UID %d.",
309 (int) ProcDieSenderPid,
310 (int) ProcDieSenderUid)));
311 else
314 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
315 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
318 break;
319 }
320
321 /*
322 * It's unclear what to do if a query cancel interrupt arrives. We
323 * can't actually abort at this point, but ignoring the interrupt
324 * altogether is not helpful, so we just terminate the wait with a
325 * suitable warning.
326 */
328 {
329 QueryCancelPending = false;
331 (errmsg("canceling wait for synchronous replication due to user request"),
332 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
334 break;
335 }
336
337 /*
338 * Wait on latch. Any condition that should wake us up will set the
339 * latch, so no need for timeout.
340 */
343
344 /*
345 * If the postmaster dies, we'll probably never get an acknowledgment,
346 * because all the wal sender processes will exit. So just bail out.
347 */
348 if (rc & WL_POSTMASTER_DEATH)
349 {
350 ProcDiePending = true;
353 break;
354 }
355 }
356
357 /*
358 * WalSender has checked our LSN and has removed us from queue. Clean up
359 * state and leave. It's OK to reset these shared memory fields without
360 * holding SyncRepLock, because any walsenders will ignore us anyway when
361 * we're not on the queue. We need a read barrier to make sure we see the
362 * changes to the queue link (this might be unnecessary without
363 * assertions, but better safe than sorry).
364 */
369
370 /* reset ps display to remove the suffix */
373}
374
375/*
376 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
377 *
378 * Usually we will go at tail of queue, though it's possible that we arrive
379 * here out of order, so start at tail and work back to insertion point.
380 */
381static void
383{
384 dlist_head *queue;
385 dlist_iter iter;
386
388 queue = &WalSndCtl->SyncRepQueue[mode];
389
390 dlist_reverse_foreach(iter, queue)
391 {
392 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
393
394 /*
395 * Stop at the queue element that we should insert after to ensure the
396 * queue is ordered by LSN.
397 */
398 if (proc->waitLSN < MyProc->waitLSN)
399 {
401 return;
402 }
403 }
404
405 /*
406 * If we get here, the list was either empty, or this process needs to be
407 * at the head.
408 */
410}
411
412/*
413 * Acquire SyncRepLock and cancel any wait currently in progress.
414 */
415static void
424
425void
427{
428 /*
429 * First check if we are removed from the queue without the lock to not
430 * slow down backend exit.
431 */
433 {
435
436 /* maybe we have just been removed, so recheck */
439
441 }
442}
443
444/*
445 * ===========================================================
446 * Synchronous Replication functions for wal sender processes
447 * ===========================================================
448 */
449
450/*
451 * Take any action required to initialise sync rep state from config
452 * data. Called at WALSender startup and after each SIGHUP.
453 */
454void
456{
457 int priority;
458
459 /*
460 * Determine if we are a potential sync standby and remember the result
461 * for handling replies from standby.
462 */
465 {
469
471 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
473 }
474}
475
476/*
477 * Update the LSNs on each queue based upon our latest state. This
478 * implements a simple policy of first-valid-sync-standby-releases-waiter.
479 *
480 * Other policies are possible, which would change what we do here and
481 * perhaps also which information we store as well.
482 */
483void
485{
490 bool got_recptr;
491 bool am_sync;
492 int numwrite = 0;
493 int numflush = 0;
494 int numapply = 0;
495
496 /*
497 * If this WALSender is serving a standby that is not on the list of
498 * potential sync standbys then we have nothing to do. If we are still
499 * starting up, still running base backup or the current flush position is
500 * still invalid, then leave quickly also. Streaming or stopping WAL
501 * senders are allowed to release waiters.
502 */
507 {
509 return;
510 }
511
512 /*
513 * We're a potential sync standby. Release waiters if there are enough
514 * sync standbys and we are considered as sync.
515 */
517
518 /*
519 * Check whether we are a sync standby or not, and calculate the synced
520 * positions among all sync standbys. (Note: although this step does not
521 * of itself require holding SyncRepLock, it seems like a good idea to do
522 * it after acquiring the lock. This ensures that the WAL pointers we use
523 * to release waiters are newer than any previous execution of this
524 * routine used.)
525 */
527
528 /*
529 * If we are managing a sync standby, though we weren't prior to this,
530 * then announce we are now a sync standby.
531 */
533 {
535
537 ereport(LOG,
538 (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
540 else
541 ereport(LOG,
542 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
544 }
545
546 /*
547 * If the number of sync standbys is less than requested or we aren't
548 * managing a sync standby then just leave.
549 */
550 if (!got_recptr || !am_sync)
551 {
554 return;
555 }
556
557 /*
558 * Set the lsn first so that when we wake backends they will release up to
559 * this location.
560 */
562 {
565 }
567 {
570 }
572 {
575 }
576
578
579 elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
583}
584
585/*
586 * Calculate the synced Write, Flush and Apply positions among sync standbys.
587 *
588 * Return false if the number of sync standbys is less than
589 * synchronous_standby_names specifies. Otherwise return true and
590 * store the positions into *writePtr, *flushPtr and *applyPtr.
591 *
592 * On return, *am_sync is set to true if this walsender is connecting to
593 * sync standby. Otherwise it's set to false.
594 */
595static bool
598{
600 int num_standbys;
601 int i;
602
603 /* Initialize default results */
607 *am_sync = false;
608
609 /* Quick out if not even configured to be synchronous */
610 if (SyncRepConfig == NULL)
611 return false;
612
613 /* Get standbys that are considered as synchronous at this moment */
615
616 /* Am I among the candidate sync standbys? */
617 for (i = 0; i < num_standbys; i++)
618 {
619 if (sync_standbys[i].is_me)
620 {
621 *am_sync = true;
622 break;
623 }
624 }
625
626 /*
627 * Nothing more to do if we are not managing a sync standby or there are
628 * not enough synchronous standbys.
629 */
630 if (!(*am_sync) ||
632 {
634 return false;
635 }
636
637 /*
638 * In a priority-based sync replication, the synced positions are the
639 * oldest ones among sync standbys. In a quorum-based, they are the Nth
640 * latest ones.
641 *
642 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
643 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
644 * because it's a bit more efficient.
645 *
646 * XXX If the numbers of current and requested sync standbys are the same,
647 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
648 * positions even in a quorum-based sync replication.
649 */
651 {
654 }
655 else
656 {
660 }
661
663 return true;
664}
665
666/*
667 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
668 */
669static void
674 int num_standbys)
675{
676 int i;
677
678 /*
679 * Scan through all sync standbys and calculate the oldest Write, Flush
680 * and Apply positions. We assume *writePtr et al were initialized to
681 * InvalidXLogRecPtr.
682 */
683 for (i = 0; i < num_standbys; i++)
684 {
686 XLogRecPtr flush = sync_standbys[i].flush;
687 XLogRecPtr apply = sync_standbys[i].apply;
688
690 *writePtr = write;
691 if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
692 *flushPtr = flush;
693 if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
694 *applyPtr = apply;
695 }
696}
697
698/*
699 * Calculate the Nth latest Write, Flush and Apply positions among sync
700 * standbys.
701 */
702static void
707 int num_standbys,
708 uint8 nth)
709{
713 int i;
714
715 /* Should have enough candidates, or somebody messed up */
716 Assert(nth > 0 && nth <= num_standbys);
717
721
722 for (i = 0; i < num_standbys; i++)
723 {
724 write_array[i] = sync_standbys[i].write;
725 flush_array[i] = sync_standbys[i].flush;
726 apply_array[i] = sync_standbys[i].apply;
727 }
728
729 /* Sort each array in descending order */
733
734 /* Get Nth latest Write, Flush, Apply positions */
735 *writePtr = write_array[nth - 1];
736 *flushPtr = flush_array[nth - 1];
737 *applyPtr = apply_array[nth - 1];
738
742}
743
744/*
745 * Compare lsn in order to sort array in descending order.
746 */
747static int
748cmp_lsn(const void *a, const void *b)
749{
750 XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
751 XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
752
753 return pg_cmp_u64(lsn2, lsn1);
754}
755
756/*
757 * Return data about walsenders that are candidates to be sync standbys.
758 *
759 * *standbys is set to a palloc'd array of structs of per-walsender data,
760 * and the number of valid entries (candidate sync senders) is returned.
761 * (This might be more or fewer than num_sync; caller must check.)
762 */
763int
765{
766 int i;
767 int n;
768
769 /* Create result array */
771
772 /* Quick exit if sync replication is not requested */
773 if (SyncRepConfig == NULL)
774 return 0;
775
776 /* Collect raw data from shared memory */
777 n = 0;
778 for (i = 0; i < max_wal_senders; i++)
779 {
780 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
781 * rearrangement */
783 WalSndState state; /* not included in SyncRepStandbyData */
784
786 stby = *standbys + n;
787
788 SpinLockAcquire(&walsnd->mutex);
789 stby->pid = walsnd->pid;
790 state = walsnd->state;
791 stby->write = walsnd->write;
792 stby->flush = walsnd->flush;
793 stby->apply = walsnd->apply;
794 stby->sync_standby_priority = walsnd->sync_standby_priority;
795 SpinLockRelease(&walsnd->mutex);
796
797 /* Must be active */
798 if (stby->pid == 0)
799 continue;
800
801 /* Must be streaming or stopping */
804 continue;
805
806 /* Must be synchronous */
807 if (stby->sync_standby_priority == 0)
808 continue;
809
810 /* Must have a valid flush position */
811 if (!XLogRecPtrIsValid(stby->flush))
812 continue;
813
814 /* OK, it's a candidate */
815 stby->walsnd_index = i;
816 stby->is_me = (walsnd == MyWalSnd);
817 n++;
818 }
819
820 /*
821 * In quorum mode, we return all the candidates. In priority mode, if we
822 * have too many candidates then return only the num_sync ones of highest
823 * priority.
824 */
827 {
828 /* Sort by priority ... */
829 qsort(*standbys, n, sizeof(SyncRepStandbyData),
831 /* ... then report just the first num_sync ones */
833 }
834
835 return n;
836}
837
838/*
839 * qsort comparator to sort SyncRepStandbyData entries by priority
840 */
841static int
842standby_priority_comparator(const void *a, const void *b)
843{
844 const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
845 const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
846
847 /* First, sort by increasing priority value */
848 if (sa->sync_standby_priority != sb->sync_standby_priority)
849 return sa->sync_standby_priority - sb->sync_standby_priority;
850
851 /*
852 * We might have equal priority values; arbitrarily break ties by position
853 * in the WalSnd array. (This is utterly bogus, since that is arrival
854 * order dependent, but there are regression tests that rely on it.)
855 */
856 return sa->walsnd_index - sb->walsnd_index;
857}
858
859
860/*
861 * Check if we are in the list of sync standbys, and if so, determine
862 * priority sequence. Return priority if set, or zero to indicate that
863 * we are not a potential sync standby.
864 *
865 * Compare the parameter SyncRepStandbyNames against the application_name
866 * for this WALSender, or allow any name if we find a wildcard "*".
867 */
868static int
870{
871 const char *standby_name;
872 int priority;
873 bool found = false;
874
875 /*
876 * Since synchronous cascade replication is not allowed, we always set the
877 * priority of cascading walsender to zero.
878 */
880 return 0;
881
883 return 0;
884
887 {
889 strcmp(standby_name, "*") == 0)
890 {
891 found = true;
892 break;
893 }
895 }
896
897 if (!found)
898 return 0;
899
900 /*
901 * In quorum-based sync replication, all the standbys in the list have the
902 * same priority, one.
903 */
905}
906
907/*
908 * Walk the specified queue from head. Set the state of any backends that
909 * need to be woken, remove them from the queue, and then wake them.
910 * Pass all = true to wake whole queue; otherwise, just wake up to
911 * the walsender's LSN.
912 *
913 * The caller must hold SyncRepLock in exclusive mode.
914 */
915static int
916SyncRepWakeQueue(bool all, int mode)
917{
919 int numprocs = 0;
921
925
927 {
928 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
929
930 /*
931 * Assume the queue is ordered by LSN
932 */
933 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
934 return numprocs;
935
936 /*
937 * Remove from queue.
938 */
940
941 /*
942 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
943 * make sure that it sees the queue link being removed before the
944 * syncRepState change.
945 */
947
948 /*
949 * Set state to complete; see SyncRepWaitForLSN() for discussion of
950 * the various states.
951 */
953
954 /*
955 * Wake only when we have set state and removed from queue.
956 */
957 SetLatch(&(proc->procLatch));
958
959 numprocs++;
960 }
961
962 return numprocs;
963}
964
965/*
966 * The checkpointer calls this as needed to update the shared
967 * sync_standbys_status flag, so that backends don't remain permanently wedged
968 * if synchronous_standby_names is unset. It's safe to check the current value
969 * without the lock, because it's only ever updated by one process. But we
970 * must take the lock to change it.
971 */
972void
974{
976
979 {
981
982 /*
983 * If synchronous_standby_names has been reset to empty, it's futile
984 * for backends to continue waiting. Since the user no longer wants
985 * synchronous replication, we'd better wake them up.
986 */
988 {
989 int i;
990
991 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
992 SyncRepWakeQueue(true, i);
993 }
994
995 /*
996 * Only allow people to join the queue when there are synchronous
997 * standbys defined. Without this interlock, there's a race
998 * condition: we might wake up all the current waiters; then, some
999 * backend that hasn't yet reloaded its config might go to sleep on
1000 * the queue (and never wake up). This prevents that.
1001 */
1004
1006 }
1008 {
1010
1011 /*
1012 * Note that there is no need to wake up the queues here. We would
1013 * reach this path only if SyncStandbysDefined() returns false, or it
1014 * would mean that some backends are waiting with the GUC set. See
1015 * SyncRepWaitForLSN().
1016 */
1018
1019 /*
1020 * Even if there is no sync standby defined, let the readers of this
1021 * information know that the sync standby data has been initialized.
1022 * This can just be done once, hence the previous check on
1023 * SYNC_STANDBY_INIT to avoid useless work.
1024 */
1026
1028 }
1029}
1030
1031#ifdef USE_ASSERT_CHECKING
1032static bool
1034{
1036 dlist_iter iter;
1037
1039
1041
1043 {
1044 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
1045
1046 /*
1047 * Check the queue is ordered by LSN and that multiple procs don't
1048 * have matching LSNs
1049 */
1050 if (proc->waitLSN <= lastLSN)
1051 return false;
1052
1053 lastLSN = proc->waitLSN;
1054 }
1055
1056 return true;
1057}
1058#endif
1059
1060/*
1061 * ===========================================================
1062 * Synchronous Replication functions executed by any process
1063 * ===========================================================
1064 */
1065
1066bool
1068{
1069 if (*newval != NULL && (*newval)[0] != '\0')
1070 {
1071 yyscan_t scanner;
1072 int parse_rc;
1074
1075 /* Result of parsing is returned in one of these two variables */
1078
1079 /* Parse the synchronous_standby_names string */
1080 syncrep_scanner_init(*newval, &scanner);
1082 syncrep_scanner_finish(scanner);
1083
1084 if (parse_rc != 0 || syncrep_parse_result == NULL)
1085 {
1089 else
1090 /* translator: %s is a GUC name */
1091 GUC_check_errdetail("\"%s\" parser failed.",
1092 "synchronous_standby_names");
1093 return false;
1094 }
1095
1096 if (syncrep_parse_result->num_sync <= 0)
1097 {
1098 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1099 syncrep_parse_result->num_sync);
1100 return false;
1101 }
1102
1103 /* GUC extra value must be guc_malloc'd, not palloc'd */
1105 guc_malloc(LOG, syncrep_parse_result->config_size);
1106 if (pconf == NULL)
1107 return false;
1109
1110 *extra = pconf;
1111
1112 /*
1113 * We need not explicitly clean up syncrep_parse_result. It, and any
1114 * other cruft generated during parsing, will be freed when the
1115 * current memory context is deleted. (This code is generally run in
1116 * a short-lived context used for config file processing, so that will
1117 * not be very long.)
1118 */
1119 }
1120 else
1121 *extra = NULL;
1122
1123 return true;
1124}
1125
1126void
1128{
1129 SyncRepConfig = (SyncRepConfigData *) extra;
1130}
1131
1132void
1134{
1135 switch (newval)
1136 {
1139 break;
1142 break;
1145 break;
1146 default:
1148 break;
1149 }
1150}
#define pg_read_barrier()
Definition atomics.h:154
#define pg_write_barrier()
Definition atomics.h:155
#define Min(x, y)
Definition c.h:1091
uint8_t uint8
Definition c.h:622
#define Assert(condition)
Definition c.h:943
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
void * yyscan_t
Definition cubedata.h:65
@ DestNone
Definition dest.h:87
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define DEBUG1
Definition elog.h:31
int int int errdetail_log(const char *fmt,...) pg_attribute_printf(1
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define palloc_array(type, count)
Definition fe_memutils.h:76
volatile int ProcDieSenderPid
Definition globals.c:46
volatile uint32 InterruptHoldoffCount
Definition globals.c:43
volatile int ProcDieSenderUid
Definition globals.c:47
volatile sig_atomic_t QueryCancelPending
Definition globals.c:33
struct Latch * MyLatch
Definition globals.c:65
volatile sig_atomic_t ProcDiePending
Definition globals.c:34
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6666
void * guc_malloc(int elevel, size_t size)
Definition guc.c:637
#define newval
#define GUC_check_errmsg
Definition guc.h:503
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
char * application_name
Definition guc_tables.c:589
static void dlist_insert_after(dlist_node *after, dlist_node *node)
Definition ilist.h:381
#define dlist_foreach(iter, lhead)
Definition ilist.h:623
static void dlist_delete_thoroughly(dlist_node *node)
Definition ilist.h:416
static bool dlist_node_is_detached(const dlist_node *node)
Definition ilist.h:525
#define dlist_reverse_foreach(iter, lhead)
Definition ilist.h:654
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition ilist.h:347
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
static int pg_cmp_u64(uint64 a, uint64 b)
Definition int.h:731
#define write(a, b, c)
Definition win32.h:14
int b
Definition isn.c:74
int a
Definition isn.c:73
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
static char * errmsg
static PgChecksumMode mode
static rewind_source * source
Definition pg_rewind.c:89
int pg_strcasecmp(const char *s1, const char *s2)
#define sprintf
Definition port.h:262
#define qsort(a, b, c, d)
Definition port.h:495
CommandDest whereToSendOutput
Definition postgres.c:97
static int fb(int x)
void set_ps_display_remove_suffix(void)
Definition ps_status.c:440
void set_ps_display_suffix(const char *suffix)
Definition ps_status.c:388
bool update_process_title
Definition ps_status.c:31
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
Definition proc.h:179
XLogRecPtr waitLSN
Definition proc.h:341
dlist_node syncRepLinks
Definition proc.h:343
int syncRepState
Definition proc.h:342
Latch procLatch
Definition proc.h:256
uint8 syncrep_method
Definition syncrep.h:68
char member_names[FLEXIBLE_ARRAY_MEMBER]
Definition syncrep.h:71
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
slock_t mutex
XLogRecPtr flush
WalSndState state
int sync_standby_priority
dlist_node * cur
Definition ilist.h:179
dlist_node * cur
Definition ilist.h:200
static int SyncRepWaitMode
Definition syncrep.c:99
void SyncRepInitConfig(void)
Definition syncrep.c:455
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
Definition syncrep.c:149
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
Definition syncrep.c:596
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
Definition syncrep.c:703
void assign_synchronous_commit(int newval, void *extra)
Definition syncrep.c:1133
void assign_synchronous_standby_names(const char *newval, void *extra)
Definition syncrep.c:1127
static int standby_priority_comparator(const void *a, const void *b)
Definition syncrep.c:842
static int SyncRepWakeQueue(bool all, int mode)
Definition syncrep.c:916
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:764
void SyncRepReleaseWaiters(void)
Definition syncrep.c:484
void SyncRepUpdateSyncStandbysDefined(void)
Definition syncrep.c:973
static bool announce_next_takeover
Definition syncrep.c:96
static int SyncRepGetStandbyPriority(void)
Definition syncrep.c:869
char * SyncRepStandbyNames
Definition syncrep.c:91
static void SyncRepQueueInsert(int mode)
Definition syncrep.c:382
static void SyncRepCancelWait(void)
Definition syncrep.c:416
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
Definition syncrep.c:1067
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
Definition syncrep.c:670
void SyncRepCleanupAtProcExit(void)
Definition syncrep.c:426
static int cmp_lsn(const void *a, const void *b)
Definition syncrep.c:748
#define SyncStandbysDefined()
Definition syncrep.c:93
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27
#define SyncRepRequested()
Definition syncrep.h:18
#define SYNC_REP_NO_WAIT
Definition syncrep.h:22
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAITING
Definition syncrep.h:31
#define SYNC_REP_WAIT_COMPLETE
Definition syncrep.h:32
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_NOT_WAITING
Definition syncrep.h:30
int syncrep_yyparse(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
void syncrep_scanner_finish(yyscan_t yyscanner)
void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp)
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
WalSnd * MyWalSnd
Definition walsender.c:132
int max_wal_senders
Definition walsender.c:141
bool am_cascading_walsender
Definition walsender.c:136
WalSndCtlData * WalSndCtl
Definition walsender.c:121
#define SYNC_STANDBY_DEFINED
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_STOPPING
#define SYNC_STANDBY_INIT
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
Definition xact.h:73
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
Definition xact.h:76
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
Definition xact.h:75
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28