PostgreSQL Source Code git master
Loading...
Searching...
No Matches
syncrep.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support a priority-based multiple synchronous
34 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 * supported. The number of synchronous standbys that transactions
36 * must wait for replies from is specified in synchronous_standby_names.
37 * This parameter also specifies a list of standby names and the method
38 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 *
40 * The method FIRST specifies a priority-based synchronous replication
41 * and makes transaction commits wait until their WAL records are
42 * replicated to the requested number of synchronous standbys chosen based
43 * on their priorities. The standbys whose names appear earlier in the list
44 * are given higher priority and will be considered as synchronous.
45 * Other standby servers appearing later in this list represent potential
46 * synchronous standbys. If any of the current synchronous standbys
47 * disconnects for whatever reason, it will be replaced immediately with
48 * the next-highest-priority standby.
49 *
50 * The method ANY specifies a quorum-based synchronous replication
51 * and makes transaction commits wait until their WAL records are
52 * replicated to at least the requested number of synchronous standbys
53 * in the list. All the standbys appearing in the list are considered as
54 * candidates for quorum synchronous standbys.
55 *
56 * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 * This is for backward compatibility with 9.6 or before where only a
58 * priority-based sync replication was supported.
59 *
60 * Before the standbys chosen from synchronous_standby_names can
61 * become the synchronous standbys they must have caught up with
62 * the primary; that may take some time. Once caught up,
63 * the standbys which are considered as synchronous at that moment
64 * will release waiters from the queue.
65 *
66 * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
67 *
68 * IDENTIFICATION
69 * src/backend/replication/syncrep.c
70 *
71 *-------------------------------------------------------------------------
72 */
73#include "postgres.h"
74
75#include <unistd.h>
76
77#include "access/xact.h"
78#include "common/int.h"
79#include "miscadmin.h"
80#include "pgstat.h"
81#include "replication/syncrep.h"
84#include "storage/proc.h"
85#include "tcop/tcopprot.h"
86#include "utils/guc_hooks.h"
87#include "utils/ps_status.h"
88#include "utils/wait_event.h"
89
90/* User-settable parameters for sync rep */
92
93#define SyncStandbysDefined() \
94 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95
96static bool announce_next_takeover = true;
97
100
101static void SyncRepQueueInsert(int mode);
102static void SyncRepCancelWait(void);
103static int SyncRepWakeQueue(bool all, int mode);
104
108 bool *am_sync);
113 int num_standbys);
118 int num_standbys,
119 uint8 nth);
120static int SyncRepGetStandbyPriority(void);
121static int standby_priority_comparator(const void *a, const void *b);
122static int cmp_lsn(const void *a, const void *b);
123
124#ifdef USE_ASSERT_CHECKING
125static bool SyncRepQueueIsOrderedByLSN(int mode);
126#endif
127
128/*
129 * ===========================================================
130 * Synchronous Replication functions for normal user backends
131 * ===========================================================
132 */
133
134/*
135 * Wait for synchronous replication, if requested by user.
136 *
137 * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 * change that state to SYNC_REP_WAITING before adding ourselves
139 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 *
143 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 * represents a commit record. If it doesn't, then we wait only for the WAL
145 * to be flushed if synchronous_commit is set to the higher level of
146 * remote_apply, because only commit records provide apply feedback.
147 */
148void
150{
151 int mode;
152
153 /*
154 * This should be called while holding interrupts during a transaction
155 * commit to prevent the follow-up shared memory queue cleanups to be
156 * influenced by external interruptions.
157 */
159
160 /*
161 * Fast exit if user has not requested sync replication, or there are no
162 * sync replication standby names defined.
163 *
164 * Since this routine gets called every commit time, it's important to
165 * exit quickly if sync replication is not requested.
166 *
167 * We check WalSndCtl->sync_standbys_status flag without the lock and exit
168 * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
169 * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
170 * replication requested).
171 *
172 * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
173 * while holding the lock, to check the flag and operate the sync rep
174 * queue atomically. This is necessary to avoid the race condition
175 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
176 * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
177 * don't touch the queue.
178 */
179 if (!SyncRepRequested() ||
180 ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
182 return;
183
184 /* Cap the level for anything other than commit to remote flush only. */
185 if (commit)
187 else
189
192
195
196 /*
197 * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
198 * SyncRepUpdateSyncStandbysDefined().
199 *
200 * Also check that the standby hasn't already replied. Unlikely race
201 * condition but we'll be fetching that cache line anyway so it's likely
202 * to be a low cost check.
203 *
204 * If the sync standby data has not been initialized yet
205 * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
206 * then do a direct GUC check.
207 */
209 {
212 {
214 return;
215 }
216 }
217 else if (lsn <= WalSndCtl->lsn[mode])
218 {
219 /*
220 * The LSN is older than what we need to wait for. The sync standby
221 * data has not been initialized yet, but we are OK to not wait
222 * because we know that there is no point in doing so based on the
223 * LSN.
224 */
226 return;
227 }
228 else if (!SyncStandbysDefined())
229 {
230 /*
231 * If we are here, the sync standby data has not been initialized yet,
232 * and the LSN is newer than what need to wait for, so we have fallen
233 * back to the best thing we could do in this case: a check on
234 * SyncStandbysDefined() to see if the GUC is set or not.
235 *
236 * When the GUC has a value, we wait until the checkpointer updates
237 * the status data because we cannot be sure yet if we should wait or
238 * not. Here, the GUC has *no* value, we are sure that there is no
239 * point to wait; this matters for example when initializing a
240 * cluster, where we should never wait, and no sync standbys is the
241 * default behavior.
242 */
244 return;
245 }
246
247 /*
248 * Set our waitLSN so WALSender will know when to wake us, and add
249 * ourselves to the queue.
250 */
251 MyProc->waitLSN = lsn;
256
257 /* Alter ps display to show waiting for sync rep. */
259 {
260 char buffer[32];
261
262 sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
263 set_ps_display_suffix(buffer);
264 }
265
266 /*
267 * Wait for specified LSN to be confirmed.
268 *
269 * Each proc has its own wait latch, so we perform a normal latch
270 * check/wait loop here.
271 */
272 for (;;)
273 {
274 int rc;
275
276 /* Must reset the latch before testing state. */
278
279 /*
280 * Acquiring the lock is not needed, the latch ensures proper
281 * barriers. If it looks like we're done, we must really be done,
282 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
283 * it will never update it again, so we can't be seeing a stale value
284 * in that case.
285 */
287 break;
288
289 /*
290 * If a wait for synchronous replication is pending, we can neither
291 * acknowledge the commit nor raise ERROR or FATAL. The latter would
292 * lead the client to believe that the transaction aborted, which is
293 * not true: it's already committed locally. The former is no good
294 * either: the client has requested synchronous replication, and is
295 * entitled to assume that an acknowledged commit is also replicated,
296 * which might not be true. So in this case we issue a WARNING (which
297 * some clients may be able to interpret) and shut off further output.
298 * We do NOT reset ProcDiePending, so that the process will die after
299 * the commit is cleaned up.
300 */
301 if (ProcDiePending)
302 {
303 if (ProcDieSenderPid != 0)
306 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
307 errdetail("The transaction has already committed locally, but might not have been replicated to the standby. Signal sent by PID %d, UID %d.",
308 (int) ProcDieSenderPid,
309 (int) ProcDieSenderUid)));
310 else
313 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
314 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
317 break;
318 }
319
320 /*
321 * It's unclear what to do if a query cancel interrupt arrives. We
322 * can't actually abort at this point, but ignoring the interrupt
323 * altogether is not helpful, so we just terminate the wait with a
324 * suitable warning.
325 */
327 {
328 QueryCancelPending = false;
330 (errmsg("canceling wait for synchronous replication due to user request"),
331 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
333 break;
334 }
335
336 /*
337 * Wait on latch. Any condition that should wake us up will set the
338 * latch, so no need for timeout.
339 */
342
343 /*
344 * If the postmaster dies, we'll probably never get an acknowledgment,
345 * because all the wal sender processes will exit. So just bail out.
346 */
347 if (rc & WL_POSTMASTER_DEATH)
348 {
349 ProcDiePending = true;
352 break;
353 }
354 }
355
356 /*
357 * WalSender has checked our LSN and has removed us from queue. Clean up
358 * state and leave. It's OK to reset these shared memory fields without
359 * holding SyncRepLock, because any walsenders will ignore us anyway when
360 * we're not on the queue. We need a read barrier to make sure we see the
361 * changes to the queue link (this might be unnecessary without
362 * assertions, but better safe than sorry).
363 */
368
369 /* reset ps display to remove the suffix */
372}
373
374/*
375 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
376 *
377 * Usually we will go at tail of queue, though it's possible that we arrive
378 * here out of order, so start at tail and work back to insertion point.
379 */
380static void
382{
383 dlist_head *queue;
384 dlist_iter iter;
385
387 queue = &WalSndCtl->SyncRepQueue[mode];
388
389 dlist_reverse_foreach(iter, queue)
390 {
391 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
392
393 /*
394 * Stop at the queue element that we should insert after to ensure the
395 * queue is ordered by LSN.
396 */
397 if (proc->waitLSN < MyProc->waitLSN)
398 {
400 return;
401 }
402 }
403
404 /*
405 * If we get here, the list was either empty, or this process needs to be
406 * at the head.
407 */
409}
410
411/*
412 * Acquire SyncRepLock and cancel any wait currently in progress.
413 */
414static void
423
424void
426{
427 /*
428 * First check if we are removed from the queue without the lock to not
429 * slow down backend exit.
430 */
432 {
434
435 /* maybe we have just been removed, so recheck */
438
440 }
441}
442
443/*
444 * ===========================================================
445 * Synchronous Replication functions for wal sender processes
446 * ===========================================================
447 */
448
449/*
450 * Take any action required to initialise sync rep state from config
451 * data. Called at WALSender startup and after each SIGHUP.
452 */
453void
455{
456 int priority;
457
458 /*
459 * Determine if we are a potential sync standby and remember the result
460 * for handling replies from standby.
461 */
464 {
468
470 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
472 }
473}
474
475/*
476 * Update the LSNs on each queue based upon our latest state. This
477 * implements a simple policy of first-valid-sync-standby-releases-waiter.
478 *
479 * Other policies are possible, which would change what we do here and
480 * perhaps also which information we store as well.
481 */
482void
484{
489 bool got_recptr;
490 bool am_sync;
491 int numwrite = 0;
492 int numflush = 0;
493 int numapply = 0;
494
495 /*
496 * If this WALSender is serving a standby that is not on the list of
497 * potential sync standbys then we have nothing to do. If we are still
498 * starting up, still running base backup or the current flush position is
499 * still invalid, then leave quickly also. Streaming or stopping WAL
500 * senders are allowed to release waiters.
501 */
506 {
508 return;
509 }
510
511 /*
512 * We're a potential sync standby. Release waiters if there are enough
513 * sync standbys and we are considered as sync.
514 */
516
517 /*
518 * Check whether we are a sync standby or not, and calculate the synced
519 * positions among all sync standbys. (Note: although this step does not
520 * of itself require holding SyncRepLock, it seems like a good idea to do
521 * it after acquiring the lock. This ensures that the WAL pointers we use
522 * to release waiters are newer than any previous execution of this
523 * routine used.)
524 */
526
527 /*
528 * If we are managing a sync standby, though we weren't prior to this,
529 * then announce we are now a sync standby.
530 */
532 {
534
536 ereport(LOG,
537 (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
539 else
540 ereport(LOG,
541 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
543 }
544
545 /*
546 * If the number of sync standbys is less than requested or we aren't
547 * managing a sync standby then just leave.
548 */
549 if (!got_recptr || !am_sync)
550 {
553 return;
554 }
555
556 /*
557 * Set the lsn first so that when we wake backends they will release up to
558 * this location.
559 */
561 {
564 }
566 {
569 }
571 {
574 }
575
577
578 elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
582}
583
584/*
585 * Calculate the synced Write, Flush and Apply positions among sync standbys.
586 *
587 * Return false if the number of sync standbys is less than
588 * synchronous_standby_names specifies. Otherwise return true and
589 * store the positions into *writePtr, *flushPtr and *applyPtr.
590 *
591 * On return, *am_sync is set to true if this walsender is connecting to
592 * sync standby. Otherwise it's set to false.
593 */
594static bool
597{
599 int num_standbys;
600 int i;
601
602 /* Initialize default results */
606 *am_sync = false;
607
608 /* Quick out if not even configured to be synchronous */
609 if (SyncRepConfig == NULL)
610 return false;
611
612 /* Get standbys that are considered as synchronous at this moment */
614
615 /* Am I among the candidate sync standbys? */
616 for (i = 0; i < num_standbys; i++)
617 {
618 if (sync_standbys[i].is_me)
619 {
620 *am_sync = true;
621 break;
622 }
623 }
624
625 /*
626 * Nothing more to do if we are not managing a sync standby or there are
627 * not enough synchronous standbys.
628 */
629 if (!(*am_sync) ||
631 {
633 return false;
634 }
635
636 /*
637 * In a priority-based sync replication, the synced positions are the
638 * oldest ones among sync standbys. In a quorum-based, they are the Nth
639 * latest ones.
640 *
641 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
642 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
643 * because it's a bit more efficient.
644 *
645 * XXX If the numbers of current and requested sync standbys are the same,
646 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
647 * positions even in a quorum-based sync replication.
648 */
650 {
653 }
654 else
655 {
659 }
660
662 return true;
663}
664
665/*
666 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
667 */
668static void
673 int num_standbys)
674{
675 int i;
676
677 /*
678 * Scan through all sync standbys and calculate the oldest Write, Flush
679 * and Apply positions. We assume *writePtr et al were initialized to
680 * InvalidXLogRecPtr.
681 */
682 for (i = 0; i < num_standbys; i++)
683 {
685 XLogRecPtr flush = sync_standbys[i].flush;
686 XLogRecPtr apply = sync_standbys[i].apply;
687
689 *writePtr = write;
690 if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
691 *flushPtr = flush;
692 if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
693 *applyPtr = apply;
694 }
695}
696
697/*
698 * Calculate the Nth latest Write, Flush and Apply positions among sync
699 * standbys.
700 */
701static void
706 int num_standbys,
707 uint8 nth)
708{
712 int i;
713
714 /* Should have enough candidates, or somebody messed up */
715 Assert(nth > 0 && nth <= num_standbys);
716
720
721 for (i = 0; i < num_standbys; i++)
722 {
723 write_array[i] = sync_standbys[i].write;
724 flush_array[i] = sync_standbys[i].flush;
725 apply_array[i] = sync_standbys[i].apply;
726 }
727
728 /* Sort each array in descending order */
732
733 /* Get Nth latest Write, Flush, Apply positions */
734 *writePtr = write_array[nth - 1];
735 *flushPtr = flush_array[nth - 1];
736 *applyPtr = apply_array[nth - 1];
737
741}
742
743/*
744 * Compare lsn in order to sort array in descending order.
745 */
746static int
747cmp_lsn(const void *a, const void *b)
748{
749 XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
750 XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
751
752 return pg_cmp_u64(lsn2, lsn1);
753}
754
755/*
756 * Return data about walsenders that are candidates to be sync standbys.
757 *
758 * *standbys is set to a palloc'd array of structs of per-walsender data,
759 * and the number of valid entries (candidate sync senders) is returned.
760 * (This might be more or fewer than num_sync; caller must check.)
761 */
762int
764{
765 int i;
766 int n;
767
768 /* Create result array */
770
771 /* Quick exit if sync replication is not requested */
772 if (SyncRepConfig == NULL)
773 return 0;
774
775 /* Collect raw data from shared memory */
776 n = 0;
777 for (i = 0; i < max_wal_senders; i++)
778 {
779 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
780 * rearrangement */
782 WalSndState state; /* not included in SyncRepStandbyData */
783
785 stby = *standbys + n;
786
787 SpinLockAcquire(&walsnd->mutex);
788 stby->pid = walsnd->pid;
789 state = walsnd->state;
790 stby->write = walsnd->write;
791 stby->flush = walsnd->flush;
792 stby->apply = walsnd->apply;
793 stby->sync_standby_priority = walsnd->sync_standby_priority;
794 SpinLockRelease(&walsnd->mutex);
795
796 /* Must be active */
797 if (stby->pid == 0)
798 continue;
799
800 /* Must be streaming or stopping */
803 continue;
804
805 /* Must be synchronous */
806 if (stby->sync_standby_priority == 0)
807 continue;
808
809 /* Must have a valid flush position */
810 if (!XLogRecPtrIsValid(stby->flush))
811 continue;
812
813 /* OK, it's a candidate */
814 stby->walsnd_index = i;
815 stby->is_me = (walsnd == MyWalSnd);
816 n++;
817 }
818
819 /*
820 * In quorum mode, we return all the candidates. In priority mode, if we
821 * have too many candidates then return only the num_sync ones of highest
822 * priority.
823 */
826 {
827 /* Sort by priority ... */
828 qsort(*standbys, n, sizeof(SyncRepStandbyData),
830 /* ... then report just the first num_sync ones */
832 }
833
834 return n;
835}
836
837/*
838 * qsort comparator to sort SyncRepStandbyData entries by priority
839 */
840static int
841standby_priority_comparator(const void *a, const void *b)
842{
843 const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
844 const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
845
846 /* First, sort by increasing priority value */
847 if (sa->sync_standby_priority != sb->sync_standby_priority)
848 return sa->sync_standby_priority - sb->sync_standby_priority;
849
850 /*
851 * We might have equal priority values; arbitrarily break ties by position
852 * in the WalSnd array. (This is utterly bogus, since that is arrival
853 * order dependent, but there are regression tests that rely on it.)
854 */
855 return sa->walsnd_index - sb->walsnd_index;
856}
857
858
859/*
860 * Check if we are in the list of sync standbys, and if so, determine
861 * priority sequence. Return priority if set, or zero to indicate that
862 * we are not a potential sync standby.
863 *
864 * Compare the parameter SyncRepStandbyNames against the application_name
865 * for this WALSender, or allow any name if we find a wildcard "*".
866 */
867static int
869{
870 const char *standby_name;
871 int priority;
872 bool found = false;
873
874 /*
875 * Since synchronous cascade replication is not allowed, we always set the
876 * priority of cascading walsender to zero.
877 */
879 return 0;
880
882 return 0;
883
886 {
888 strcmp(standby_name, "*") == 0)
889 {
890 found = true;
891 break;
892 }
894 }
895
896 if (!found)
897 return 0;
898
899 /*
900 * In quorum-based sync replication, all the standbys in the list have the
901 * same priority, one.
902 */
904}
905
906/*
907 * Walk the specified queue from head. Set the state of any backends that
908 * need to be woken, remove them from the queue, and then wake them.
909 * Pass all = true to wake whole queue; otherwise, just wake up to
910 * the walsender's LSN.
911 *
912 * The caller must hold SyncRepLock in exclusive mode.
913 */
914static int
915SyncRepWakeQueue(bool all, int mode)
916{
918 int numprocs = 0;
920
924
926 {
927 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
928
929 /*
930 * Assume the queue is ordered by LSN
931 */
932 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
933 return numprocs;
934
935 /*
936 * Remove from queue.
937 */
939
940 /*
941 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
942 * make sure that it sees the queue link being removed before the
943 * syncRepState change.
944 */
946
947 /*
948 * Set state to complete; see SyncRepWaitForLSN() for discussion of
949 * the various states.
950 */
952
953 /*
954 * Wake only when we have set state and removed from queue.
955 */
956 SetLatch(&(proc->procLatch));
957
958 numprocs++;
959 }
960
961 return numprocs;
962}
963
964/*
965 * The checkpointer calls this as needed to update the shared
966 * sync_standbys_status flag, so that backends don't remain permanently wedged
967 * if synchronous_standby_names is unset. It's safe to check the current value
968 * without the lock, because it's only ever updated by one process. But we
969 * must take the lock to change it.
970 */
971void
973{
975
978 {
980
981 /*
982 * If synchronous_standby_names has been reset to empty, it's futile
983 * for backends to continue waiting. Since the user no longer wants
984 * synchronous replication, we'd better wake them up.
985 */
987 {
988 int i;
989
990 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
991 SyncRepWakeQueue(true, i);
992 }
993
994 /*
995 * Only allow people to join the queue when there are synchronous
996 * standbys defined. Without this interlock, there's a race
997 * condition: we might wake up all the current waiters; then, some
998 * backend that hasn't yet reloaded its config might go to sleep on
999 * the queue (and never wake up). This prevents that.
1000 */
1003
1005 }
1007 {
1009
1010 /*
1011 * Note that there is no need to wake up the queues here. We would
1012 * reach this path only if SyncStandbysDefined() returns false, or it
1013 * would mean that some backends are waiting with the GUC set. See
1014 * SyncRepWaitForLSN().
1015 */
1017
1018 /*
1019 * Even if there is no sync standby defined, let the readers of this
1020 * information know that the sync standby data has been initialized.
1021 * This can just be done once, hence the previous check on
1022 * SYNC_STANDBY_INIT to avoid useless work.
1023 */
1025
1027 }
1028}
1029
1030#ifdef USE_ASSERT_CHECKING
1031static bool
1033{
1035 dlist_iter iter;
1036
1038
1040
1042 {
1043 PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
1044
1045 /*
1046 * Check the queue is ordered by LSN and that multiple procs don't
1047 * have matching LSNs
1048 */
1049 if (proc->waitLSN <= lastLSN)
1050 return false;
1051
1052 lastLSN = proc->waitLSN;
1053 }
1054
1055 return true;
1056}
1057#endif
1058
1059/*
1060 * ===========================================================
1061 * Synchronous Replication functions executed by any process
1062 * ===========================================================
1063 */
1064
1065bool
1067{
1068 if (*newval != NULL && (*newval)[0] != '\0')
1069 {
1070 yyscan_t scanner;
1071 int parse_rc;
1073
1074 /* Result of parsing is returned in one of these two variables */
1077
1078 /* Parse the synchronous_standby_names string */
1079 syncrep_scanner_init(*newval, &scanner);
1081 syncrep_scanner_finish(scanner);
1082
1083 if (parse_rc != 0 || syncrep_parse_result == NULL)
1084 {
1088 else
1089 /* translator: %s is a GUC name */
1090 GUC_check_errdetail("\"%s\" parser failed.",
1091 "synchronous_standby_names");
1092 return false;
1093 }
1094
1095 if (syncrep_parse_result->num_sync <= 0)
1096 {
1097 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1098 syncrep_parse_result->num_sync);
1099 return false;
1100 }
1101
1102 /* GUC extra value must be guc_malloc'd, not palloc'd */
1104 guc_malloc(LOG, syncrep_parse_result->config_size);
1105 if (pconf == NULL)
1106 return false;
1108
1109 *extra = pconf;
1110
1111 /*
1112 * We need not explicitly clean up syncrep_parse_result. It, and any
1113 * other cruft generated during parsing, will be freed when the
1114 * current memory context is deleted. (This code is generally run in
1115 * a short-lived context used for config file processing, so that will
1116 * not be very long.)
1117 */
1118 }
1119 else
1120 *extra = NULL;
1121
1122 return true;
1123}
1124
1125void
1127{
1128 SyncRepConfig = (SyncRepConfigData *) extra;
1129}
1130
1131void
1133{
1134 switch (newval)
1135 {
1138 break;
1141 break;
1144 break;
1145 default:
1147 break;
1148 }
1149}
#define pg_read_barrier()
Definition atomics.h:154
#define pg_write_barrier()
Definition atomics.h:155
#define Min(x, y)
Definition c.h:1091
uint8_t uint8
Definition c.h:622
#define Assert(condition)
Definition c.h:943
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
void * yyscan_t
Definition cubedata.h:65
@ DestNone
Definition dest.h:87
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
#define DEBUG3
Definition elog.h:29
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define DEBUG1
Definition elog.h:31
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define palloc_array(type, count)
Definition fe_memutils.h:76
volatile int ProcDieSenderPid
Definition globals.c:46
volatile uint32 InterruptHoldoffCount
Definition globals.c:43
volatile int ProcDieSenderUid
Definition globals.c:47
volatile sig_atomic_t QueryCancelPending
Definition globals.c:33
struct Latch * MyLatch
Definition globals.c:65
volatile sig_atomic_t ProcDiePending
Definition globals.c:34
void GUC_check_errcode(int sqlerrcode)
Definition guc.c:6666
void * guc_malloc(int elevel, size_t size)
Definition guc.c:637
#define newval
#define GUC_check_errmsg
Definition guc.h:503
#define GUC_check_errdetail
Definition guc.h:507
GucSource
Definition guc.h:112
char * application_name
Definition guc_tables.c:589
static void dlist_insert_after(dlist_node *after, dlist_node *node)
Definition ilist.h:381
#define dlist_foreach(iter, lhead)
Definition ilist.h:623
static void dlist_delete_thoroughly(dlist_node *node)
Definition ilist.h:416
static bool dlist_node_is_detached(const dlist_node *node)
Definition ilist.h:525
#define dlist_reverse_foreach(iter, lhead)
Definition ilist.h:654
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition ilist.h:347
#define dlist_foreach_modify(iter, lhead)
Definition ilist.h:640
#define dlist_container(type, membername, ptr)
Definition ilist.h:593
static int pg_cmp_u64(uint64 a, uint64 b)
Definition int.h:731
#define write(a, b, c)
Definition win32.h:14
int b
Definition isn.c:74
int a
Definition isn.c:73
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
static char * errmsg
static PgChecksumMode mode
static rewind_source * source
Definition pg_rewind.c:89
int pg_strcasecmp(const char *s1, const char *s2)
#define sprintf
Definition port.h:262
#define qsort(a, b, c, d)
Definition port.h:495
CommandDest whereToSendOutput
Definition postgres.c:97
static int fb(int x)
void set_ps_display_remove_suffix(void)
Definition ps_status.c:440
void set_ps_display_suffix(const char *suffix)
Definition ps_status.c:388
bool update_process_title
Definition ps_status.c:31
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
PGPROC * MyProc
Definition proc.c:71
Definition proc.h:179
XLogRecPtr waitLSN
Definition proc.h:341
dlist_node syncRepLinks
Definition proc.h:343
int syncRepState
Definition proc.h:342
Latch procLatch
Definition proc.h:256
uint8 syncrep_method
Definition syncrep.h:68
char member_names[FLEXIBLE_ARRAY_MEMBER]
Definition syncrep.h:71
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
slock_t mutex
XLogRecPtr flush
WalSndState state
int sync_standby_priority
dlist_node * cur
Definition ilist.h:179
dlist_node * cur
Definition ilist.h:200
static int SyncRepWaitMode
Definition syncrep.c:99
void SyncRepInitConfig(void)
Definition syncrep.c:454
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
Definition syncrep.c:149
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
Definition syncrep.c:595
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
Definition syncrep.c:702
void assign_synchronous_commit(int newval, void *extra)
Definition syncrep.c:1132
void assign_synchronous_standby_names(const char *newval, void *extra)
Definition syncrep.c:1126
static int standby_priority_comparator(const void *a, const void *b)
Definition syncrep.c:841
static int SyncRepWakeQueue(bool all, int mode)
Definition syncrep.c:915
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:763
void SyncRepReleaseWaiters(void)
Definition syncrep.c:483
void SyncRepUpdateSyncStandbysDefined(void)
Definition syncrep.c:972
static bool announce_next_takeover
Definition syncrep.c:96
static int SyncRepGetStandbyPriority(void)
Definition syncrep.c:868
char * SyncRepStandbyNames
Definition syncrep.c:91
static void SyncRepQueueInsert(int mode)
Definition syncrep.c:381
static void SyncRepCancelWait(void)
Definition syncrep.c:415
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
Definition syncrep.c:1066
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
Definition syncrep.c:669
void SyncRepCleanupAtProcExit(void)
Definition syncrep.c:425
static int cmp_lsn(const void *a, const void *b)
Definition syncrep.c:747
#define SyncStandbysDefined()
Definition syncrep.c:93
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27
#define SyncRepRequested()
Definition syncrep.h:18
#define SYNC_REP_NO_WAIT
Definition syncrep.h:22
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAITING
Definition syncrep.h:31
#define SYNC_REP_WAIT_COMPLETE
Definition syncrep.h:32
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_NOT_WAITING
Definition syncrep.h:30
int syncrep_yyparse(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
void syncrep_scanner_finish(yyscan_t yyscanner)
void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp)
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
WalSnd * MyWalSnd
Definition walsender.c:132
int max_wal_senders
Definition walsender.c:141
bool am_cascading_walsender
Definition walsender.c:136
WalSndCtlData * WalSndCtl
Definition walsender.c:121
#define SYNC_STANDBY_DEFINED
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_STOPPING
#define SYNC_STANDBY_INIT
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
Definition xact.h:73
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
Definition xact.h:76
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
Definition xact.h:75
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28