PostgreSQL Source Code git master
Loading...
Searching...
No Matches
syncrep.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for syncrep.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  SyncRepStandbyData
 
struct  SyncRepConfigData
 

Macros

#define SyncRepRequested()    (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
 
#define SYNC_REP_NO_WAIT   (-1)
 
#define SYNC_REP_WAIT_WRITE   0
 
#define SYNC_REP_WAIT_FLUSH   1
 
#define SYNC_REP_WAIT_APPLY   2
 
#define NUM_SYNC_REP_WAIT_MODE   3
 
#define SYNC_REP_NOT_WAITING   0
 
#define SYNC_REP_WAITING   1
 
#define SYNC_REP_WAIT_COMPLETE   2
 
#define SYNC_REP_PRIORITY   0
 
#define SYNC_REP_QUORUM   1
 

Typedefs

typedef struct SyncRepStandbyData SyncRepStandbyData
 
typedef struct SyncRepConfigData SyncRepConfigData
 
typedef voidyyscan_t
 

Functions

void SyncRepWaitForLSN (XLogRecPtr lsn, bool commit)
 
void SyncRepCleanupAtProcExit (void)
 
void SyncRepInitConfig (void)
 
void SyncRepReleaseWaiters (void)
 
int SyncRepGetCandidateStandbys (SyncRepStandbyData **standbys)
 
void SyncRepUpdateSyncStandbysDefined (void)
 
int syncrep_yyparse (SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
 
int syncrep_yylex (union YYSTYPE *yylval_param, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
 
void syncrep_yyerror (SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner, const char *message)
 
void syncrep_scanner_init (const char *str, yyscan_t *yyscannerp)
 
void syncrep_scanner_finish (yyscan_t yyscanner)
 

Variables

PGDLLIMPORT SyncRepConfigDataSyncRepConfig
 
PGDLLIMPORT charSyncRepStandbyNames
 

Macro Definition Documentation

◆ NUM_SYNC_REP_WAIT_MODE

#define NUM_SYNC_REP_WAIT_MODE   3

Definition at line 27 of file syncrep.h.

◆ SYNC_REP_NO_WAIT

#define SYNC_REP_NO_WAIT   (-1)

Definition at line 22 of file syncrep.h.

◆ SYNC_REP_NOT_WAITING

#define SYNC_REP_NOT_WAITING   0

Definition at line 30 of file syncrep.h.

◆ SYNC_REP_PRIORITY

#define SYNC_REP_PRIORITY   0

Definition at line 35 of file syncrep.h.

◆ SYNC_REP_QUORUM

#define SYNC_REP_QUORUM   1

Definition at line 36 of file syncrep.h.

◆ SYNC_REP_WAIT_APPLY

#define SYNC_REP_WAIT_APPLY   2

Definition at line 25 of file syncrep.h.

◆ SYNC_REP_WAIT_COMPLETE

#define SYNC_REP_WAIT_COMPLETE   2

Definition at line 32 of file syncrep.h.

◆ SYNC_REP_WAIT_FLUSH

#define SYNC_REP_WAIT_FLUSH   1

Definition at line 24 of file syncrep.h.

◆ SYNC_REP_WAIT_WRITE

#define SYNC_REP_WAIT_WRITE   0

Definition at line 23 of file syncrep.h.

◆ SYNC_REP_WAITING

#define SYNC_REP_WAITING   1

Definition at line 31 of file syncrep.h.

◆ SyncRepRequested

Definition at line 18 of file syncrep.h.

42{
43 /* Copies of relevant fields from WalSnd shared-memory struct */
44 pid_t pid;
46 XLogRecPtr flush;
47 XLogRecPtr apply;
48 int sync_standby_priority;
49 /* Index of this walsender in the WalSnd shared-memory array */
50 int walsnd_index;
51 /* This flag indicates whether this struct is about our own process */
52 bool is_me;
54
55/*
56 * Struct for the configuration of synchronous replication.
57 *
58 * Note: this must be a flat representation that can be held in a single
59 * chunk of malloc'd memory, so that it can be stored as the "extra" data
60 * for the synchronous_standby_names GUC.
61 */
62typedef struct SyncRepConfigData
63{
64 int config_size; /* total size of this struct, in bytes */
65 int num_sync; /* number of sync standbys that we need to
66 * wait for */
67 uint8 syncrep_method; /* method to choose sync standbys */
68 int nmembers; /* number of members in the following list */
69 /* member_names contains nmembers consecutive nul-terminated C strings */
72
74
75/* user-settable parameters for synchronous replication */
77
78/* called by user backend */
79extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
80
81/* called at backend exit */
82extern void SyncRepCleanupAtProcExit(void);
83
84/* called by wal sender */
85extern void SyncRepInitConfig(void);
86extern void SyncRepReleaseWaiters(void);
87
88/* called by wal sender and user backend */
90
91/* called by checkpointer */
92extern void SyncRepUpdateSyncStandbysDefined(void);
93
94/*
95 * Internal functions for parsing synchronous_standby_names grammar,
96 * in syncrep_gram.y and syncrep_scanner.l
97 */
98union YYSTYPE;
99typedef void *yyscan_t;
101extern int syncrep_yylex(union YYSTYPE *yylval_param, char **syncrep_parse_error_msg_p, yyscan_t yyscanner);
102extern void syncrep_yyerror(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner, const char *message);
103extern void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp);
104extern void syncrep_scanner_finish(yyscan_t yyscanner);
105
106#endif /* _SYNCREP_H */
#define PGDLLIMPORT
Definition c.h:1421
uint8_t uint8
Definition c.h:622
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
void * yyscan_t
Definition cubedata.h:65
const char * str
#define write(a, b, c)
Definition win32.h:14
static int fb(int x)
int YYSTYPE
uint8 syncrep_method
Definition syncrep.h:68
char member_names[FLEXIBLE_ARRAY_MEMBER]
Definition syncrep.h:71
void SyncRepInitConfig(void)
Definition syncrep.c:455
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
Definition syncrep.c:149
void * yyscan_t
Definition syncrep.h:100
void syncrep_scanner_finish(yyscan_t yyscanner)
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:764
void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp)
void SyncRepReleaseWaiters(void)
Definition syncrep.c:484
PGDLLIMPORT SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
void syncrep_yyerror(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner, const char *message)
PGDLLIMPORT char * SyncRepStandbyNames
Definition syncrep.c:91
void SyncRepUpdateSyncStandbysDefined(void)
Definition syncrep.c:973
int syncrep_yylex(union YYSTYPE *yylval_param, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
int syncrep_yyparse(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
void SyncRepCleanupAtProcExit(void)
Definition syncrep.c:426
uint64 XLogRecPtr
Definition xlogdefs.h:21

Typedef Documentation

◆ SyncRepConfigData

◆ SyncRepStandbyData

◆ yyscan_t

Definition at line 100 of file syncrep.h.

Function Documentation

◆ syncrep_scanner_finish()

void syncrep_scanner_finish ( yyscan_t  yyscanner)
extern

Definition at line 189 of file syncrep_scanner.l.

190{
191 pfree(yyextra);
192 yylex_destroy(yyscanner);
193}
void pfree(void *pointer)
Definition mcxt.c:1616
#define yyextra

References fb(), pfree(), and yyextra.

Referenced by check_synchronous_standby_names().

◆ syncrep_scanner_init()

void syncrep_scanner_init ( const char str,
yyscan_t yyscannerp 
)
extern

Definition at line 173 of file syncrep_scanner.l.

174{
175 yyscan_t yyscanner;
177
178 if (yylex_init(yyscannerp) != 0)
179 elog(ERROR, "yylex_init() failed: %m");
180
181 yyscanner = *yyscannerp;
182
183 yyset_extra(yyext, yyscanner);
184
185 yy_scan_string(str, yyscanner);
186}
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define palloc0_object(type)
Definition fe_memutils.h:75

References elog, ERROR, fb(), palloc0_object, and str.

Referenced by check_synchronous_standby_names().

◆ syncrep_yyerror()

void syncrep_yyerror ( SyncRepConfigData **  syncrep_parse_result_p,
char **  syncrep_parse_error_msg_p,
yyscan_t  yyscanner,
const char message 
)
extern

Definition at line 156 of file syncrep_scanner.l.

157{
158 struct yyguts_t *yyg = (struct yyguts_t *) yyscanner; /* needed for yytext
159 * macro */
160
161 /* report only the first error in a parse operation */
163 return;
164 if (yytext[0])
165 *syncrep_parse_error_msg_p = psprintf("%s at or near \"%s\"",
166 message, yytext);
167 else
168 *syncrep_parse_error_msg_p = psprintf("%s at end of input",
169 message);
170}
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References fb(), and psprintf().

◆ syncrep_yylex()

int syncrep_yylex ( union YYSTYPE yylval_param,
char **  syncrep_parse_error_msg_p,
yyscan_t  yyscanner 
)
extern

◆ syncrep_yyparse()

int syncrep_yyparse ( SyncRepConfigData **  syncrep_parse_result_p,
char **  syncrep_parse_error_msg_p,
yyscan_t  yyscanner 
)
extern

◆ SyncRepCleanupAtProcExit()

void SyncRepCleanupAtProcExit ( void  )
extern

Definition at line 426 of file syncrep.c.

427{
428 /*
429 * First check if we are removed from the queue without the lock to not
430 * slow down backend exit.
431 */
433 {
435
436 /* maybe we have just been removed, so recheck */
439
441 }
442}
static void dlist_delete_thoroughly(dlist_node *node)
Definition ilist.h:416
static bool dlist_node_is_detached(const dlist_node *node)
Definition ilist.h:525
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_EXCLUSIVE
Definition lwlock.h:104
PGPROC * MyProc
Definition proc.c:71
dlist_node syncRepLinks
Definition proc.h:343

References dlist_delete_thoroughly(), dlist_node_is_detached(), fb(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProc, and PGPROC::syncRepLinks.

Referenced by ProcKill().

◆ SyncRepGetCandidateStandbys()

int SyncRepGetCandidateStandbys ( SyncRepStandbyData **  standbys)
extern

Definition at line 764 of file syncrep.c.

765{
766 int i;
767 int n;
768
769 /* Create result array */
771
772 /* Quick exit if sync replication is not requested */
773 if (SyncRepConfig == NULL)
774 return 0;
775
776 /* Collect raw data from shared memory */
777 n = 0;
778 for (i = 0; i < max_wal_senders; i++)
779 {
780 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
781 * rearrangement */
783 WalSndState state; /* not included in SyncRepStandbyData */
784
786 stby = *standbys + n;
787
788 SpinLockAcquire(&walsnd->mutex);
789 stby->pid = walsnd->pid;
790 state = walsnd->state;
791 stby->write = walsnd->write;
792 stby->flush = walsnd->flush;
793 stby->apply = walsnd->apply;
794 stby->sync_standby_priority = walsnd->sync_standby_priority;
795 SpinLockRelease(&walsnd->mutex);
796
797 /* Must be active */
798 if (stby->pid == 0)
799 continue;
800
801 /* Must be streaming or stopping */
804 continue;
805
806 /* Must be synchronous */
807 if (stby->sync_standby_priority == 0)
808 continue;
809
810 /* Must have a valid flush position */
811 if (!XLogRecPtrIsValid(stby->flush))
812 continue;
813
814 /* OK, it's a candidate */
815 stby->walsnd_index = i;
816 stby->is_me = (walsnd == MyWalSnd);
817 n++;
818 }
819
820 /*
821 * In quorum mode, we return all the candidates. In priority mode, if we
822 * have too many candidates then return only the num_sync ones of highest
823 * priority.
824 */
827 {
828 /* Sort by priority ... */
829 qsort(*standbys, n, sizeof(SyncRepStandbyData),
831 /* ... then report just the first num_sync ones */
833 }
834
835 return n;
836}
#define palloc_array(type, count)
Definition fe_memutils.h:76
int i
Definition isn.c:77
#define qsort(a, b, c, d)
Definition port.h:495
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static int standby_priority_comparator(const void *a, const void *b)
Definition syncrep.c:842
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
WalSnd * MyWalSnd
Definition walsender.c:132
int max_wal_senders
Definition walsender.c:141
WalSndCtlData * WalSndCtl
Definition walsender.c:121
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_STOPPING
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References fb(), i, max_wal_senders, MyWalSnd, SyncRepConfigData::num_sync, palloc_array, qsort, SpinLockAcquire(), SpinLockRelease(), standby_priority_comparator(), SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, WalSndCtl, WalSndCtlData::walsnds, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, and XLogRecPtrIsValid.

Referenced by pg_stat_get_wal_senders(), and SyncRepGetSyncRecPtr().

◆ SyncRepInitConfig()

void SyncRepInitConfig ( void  )
extern

Definition at line 455 of file syncrep.c.

456{
457 int priority;
458
459 /*
460 * Determine if we are a potential sync standby and remember the result
461 * for handling replies from standby.
462 */
465 {
469
471 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
473 }
474}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
#define ereport(elevel,...)
Definition elog.h:152
char * application_name
Definition guc_tables.c:589
slock_t mutex
int sync_standby_priority
static int SyncRepGetStandbyPriority(void)
Definition syncrep.c:869

References application_name, DEBUG1, ereport, errmsg_internal(), fb(), WalSnd::mutex, MyWalSnd, SpinLockAcquire(), SpinLockRelease(), WalSnd::sync_standby_priority, and SyncRepGetStandbyPriority().

Referenced by StartLogicalReplication(), StartReplication(), and WalSndHandleConfigReload().

◆ SyncRepReleaseWaiters()

void SyncRepReleaseWaiters ( void  )
extern

Definition at line 484 of file syncrep.c.

485{
490 bool got_recptr;
491 bool am_sync;
492 int numwrite = 0;
493 int numflush = 0;
494 int numapply = 0;
495
496 /*
497 * If this WALSender is serving a standby that is not on the list of
498 * potential sync standbys then we have nothing to do. If we are still
499 * starting up, still running base backup or the current flush position is
500 * still invalid, then leave quickly also. Streaming or stopping WAL
501 * senders are allowed to release waiters.
502 */
507 {
509 return;
510 }
511
512 /*
513 * We're a potential sync standby. Release waiters if there are enough
514 * sync standbys and we are considered as sync.
515 */
517
518 /*
519 * Check whether we are a sync standby or not, and calculate the synced
520 * positions among all sync standbys. (Note: although this step does not
521 * of itself require holding SyncRepLock, it seems like a good idea to do
522 * it after acquiring the lock. This ensures that the WAL pointers we use
523 * to release waiters are newer than any previous execution of this
524 * routine used.)
525 */
527
528 /*
529 * If we are managing a sync standby, though we weren't prior to this,
530 * then announce we are now a sync standby.
531 */
533 {
535
537 ereport(LOG,
538 (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
540 else
541 ereport(LOG,
542 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
544 }
545
546 /*
547 * If the number of sync standbys is less than requested or we aren't
548 * managing a sync standby then just leave.
549 */
550 if (!got_recptr || !am_sync)
551 {
554 return;
555 }
556
557 /*
558 * Set the lsn first so that when we wake backends they will release up to
559 * this location.
560 */
562 {
565 }
567 {
570 }
572 {
575 }
576
578
579 elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
583}
#define LOG
Definition elog.h:32
#define DEBUG3
Definition elog.h:29
static char * errmsg
XLogRecPtr flush
WalSndState state
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
Definition syncrep.c:596
static int SyncRepWakeQueue(bool all, int mode)
Definition syncrep.c:916
static bool announce_next_takeover
Definition syncrep.c:96
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References announce_next_takeover, application_name, DEBUG3, elog, ereport, errmsg, fb(), WalSnd::flush, LOG, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyWalSnd, WalSnd::state, SYNC_REP_PRIORITY, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetSyncRecPtr(), SyncRepWakeQueue(), WalSndCtl, WALSNDSTATE_STOPPING, WALSNDSTATE_STREAMING, and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage(), and WalSndHandleConfigReload().

◆ SyncRepUpdateSyncStandbysDefined()

void SyncRepUpdateSyncStandbysDefined ( void  )
extern

Definition at line 973 of file syncrep.c.

974{
976
979 {
981
982 /*
983 * If synchronous_standby_names has been reset to empty, it's futile
984 * for backends to continue waiting. Since the user no longer wants
985 * synchronous replication, we'd better wake them up.
986 */
988 {
989 int i;
990
991 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
992 SyncRepWakeQueue(true, i);
993 }
994
995 /*
996 * Only allow people to join the queue when there are synchronous
997 * standbys defined. Without this interlock, there's a race
998 * condition: we might wake up all the current waiters; then, some
999 * backend that hasn't yet reloaded its config might go to sleep on
1000 * the queue (and never wake up). This prevents that.
1001 */
1004
1006 }
1008 {
1010
1011 /*
1012 * Note that there is no need to wake up the queues here. We would
1013 * reach this path only if SyncStandbysDefined() returns false, or it
1014 * would mean that some backends are waiting with the GUC set. See
1015 * SyncRepWaitForLSN().
1016 */
1018
1019 /*
1020 * Even if there is no sync standby defined, let the readers of this
1021 * information know that the sync standby data has been initialized.
1022 * This can just be done once, hence the previous check on
1023 * SYNC_STANDBY_INIT to avoid useless work.
1024 */
1026
1028 }
1029}
#define Assert(condition)
Definition c.h:943
#define SyncStandbysDefined()
Definition syncrep.c:93
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27
#define SYNC_STANDBY_DEFINED
#define SYNC_STANDBY_INIT

References Assert, fb(), i, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NUM_SYNC_REP_WAIT_MODE, SYNC_STANDBY_DEFINED, SYNC_STANDBY_INIT, WalSndCtlData::sync_standbys_status, SyncRepWakeQueue(), SyncStandbysDefined, and WalSndCtl.

Referenced by UpdateSharedMemoryConfig().

◆ SyncRepWaitForLSN()

void SyncRepWaitForLSN ( XLogRecPtr  lsn,
bool  commit 
)
extern

Definition at line 149 of file syncrep.c.

150{
151 int mode;
152
153 /*
154 * This should be called while holding interrupts during a transaction
155 * commit to prevent the follow-up shared memory queue cleanups to be
156 * influenced by external interruptions.
157 */
159
160 /*
161 * Fast exit if user has not requested sync replication, or there are no
162 * sync replication standby names defined.
163 *
164 * Since this routine gets called every commit time, it's important to
165 * exit quickly if sync replication is not requested.
166 *
167 * We check WalSndCtl->sync_standbys_status flag without the lock and exit
168 * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
169 * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
170 * replication requested).
171 *
172 * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
173 * while holding the lock, to check the flag and operate the sync rep
174 * queue atomically. This is necessary to avoid the race condition
175 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
176 * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
177 * don't touch the queue.
178 */
179 if (!SyncRepRequested() ||
180 ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
182 return;
183
184 /* Cap the level for anything other than commit to remote flush only. */
185 if (commit)
187 else
189
192
195
196 /*
197 * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
198 * SyncRepUpdateSyncStandbysDefined().
199 *
200 * Also check that the standby hasn't already replied. Unlikely race
201 * condition but we'll be fetching that cache line anyway so it's likely
202 * to be a low cost check.
203 *
204 * If the sync standby data has not been initialized yet
205 * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
206 * then do a direct GUC check.
207 */
209 {
212 {
214 return;
215 }
216 }
217 else if (lsn <= WalSndCtl->lsn[mode])
218 {
219 /*
220 * The LSN is older than what we need to wait for. The sync standby
221 * data has not been initialized yet, but we are OK to not wait
222 * because we know that there is no point in doing so based on the
223 * LSN.
224 */
226 return;
227 }
228 else if (!SyncStandbysDefined())
229 {
230 /*
231 * If we are here, the sync standby data has not been initialized yet,
232 * and the LSN is newer than what need to wait for, so we have fallen
233 * back to the best thing we could do in this case: a check on
234 * SyncStandbysDefined() to see if the GUC is set or not.
235 *
236 * When the GUC has a value, we wait until the checkpointer updates
237 * the status data because we cannot be sure yet if we should wait or
238 * not. Here, the GUC has *no* value, we are sure that there is no
239 * point to wait; this matters for example when initializing a
240 * cluster, where we should never wait, and no sync standbys is the
241 * default behavior.
242 */
244 return;
245 }
246
247 /*
248 * Set our waitLSN so WALSender will know when to wake us, and add
249 * ourselves to the queue.
250 */
251 MyProc->waitLSN = lsn;
256
257 /* Alter ps display to show waiting for sync rep. */
259 {
260 char buffer[32];
261
262 sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
263 set_ps_display_suffix(buffer);
264 }
265
266 /*
267 * Wait for specified LSN to be confirmed.
268 *
269 * Each proc has its own wait latch, so we perform a normal latch
270 * check/wait loop here.
271 */
272 for (;;)
273 {
274 int rc;
275
276 /* Must reset the latch before testing state. */
278
279 /*
280 * Acquiring the lock is not needed, the latch ensures proper
281 * barriers. If it looks like we're done, we must really be done,
282 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
283 * it will never update it again, so we can't be seeing a stale value
284 * in that case.
285 */
287 break;
288
289 /*
290 * If a wait for synchronous replication is pending, we can neither
291 * acknowledge the commit nor raise ERROR or FATAL. The latter would
292 * lead the client to believe that the transaction aborted, which is
293 * not true: it's already committed locally. The former is no good
294 * either: the client has requested synchronous replication, and is
295 * entitled to assume that an acknowledged commit is also replicated,
296 * which might not be true. So in this case we issue a WARNING (which
297 * some clients may be able to interpret) and shut off further output.
298 * We do NOT reset ProcDiePending, so that the process will die after
299 * the commit is cleaned up.
300 */
301 if (ProcDiePending)
302 {
303 if (ProcDieSenderPid != 0)
306 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
307 errdetail("The transaction has already committed locally, but might not have been replicated to the standby."),
308 errdetail_log("The transaction has already committed locally, but might not have been replicated to the standby. Signal sent by PID %d, UID %d.",
309 (int) ProcDieSenderPid,
310 (int) ProcDieSenderUid)));
311 else
314 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
315 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
318 break;
319 }
320
321 /*
322 * It's unclear what to do if a query cancel interrupt arrives. We
323 * can't actually abort at this point, but ignoring the interrupt
324 * altogether is not helpful, so we just terminate the wait with a
325 * suitable warning.
326 */
328 {
329 QueryCancelPending = false;
331 (errmsg("canceling wait for synchronous replication due to user request"),
332 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
334 break;
335 }
336
337 /*
338 * Wait on latch. Any condition that should wake us up will set the
339 * latch, so no need for timeout.
340 */
343
344 /*
345 * If the postmaster dies, we'll probably never get an acknowledgment,
346 * because all the wal sender processes will exit. So just bail out.
347 */
348 if (rc & WL_POSTMASTER_DEATH)
349 {
350 ProcDiePending = true;
353 break;
354 }
355 }
356
357 /*
358 * WalSender has checked our LSN and has removed us from queue. Clean up
359 * state and leave. It's OK to reset these shared memory fields without
360 * holding SyncRepLock, because any walsenders will ignore us anyway when
361 * we're not on the queue. We need a read barrier to make sure we see the
362 * changes to the queue link (this might be unnecessary without
363 * assertions, but better safe than sorry).
364 */
369
370 /* reset ps display to remove the suffix */
373}
#define pg_read_barrier()
Definition atomics.h:154
#define Min(x, y)
Definition c.h:1091
@ DestNone
Definition dest.h:87
int errcode(int sqlerrcode)
Definition elog.c:875
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
int int int errdetail_log(const char *fmt,...) pg_attribute_printf(1
volatile int ProcDieSenderPid
Definition globals.c:46
volatile uint32 InterruptHoldoffCount
Definition globals.c:43
volatile int ProcDieSenderUid
Definition globals.c:47
volatile sig_atomic_t QueryCancelPending
Definition globals.c:33
struct Latch * MyLatch
Definition globals.c:65
volatile sig_atomic_t ProcDiePending
Definition globals.c:34
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
static PgChecksumMode mode
#define sprintf
Definition port.h:262
CommandDest whereToSendOutput
Definition postgres.c:97
void set_ps_display_remove_suffix(void)
Definition ps_status.c:440
void set_ps_display_suffix(const char *suffix)
Definition ps_status.c:388
bool update_process_title
Definition ps_status.c:31
XLogRecPtr waitLSN
Definition proc.h:341
int syncRepState
Definition proc.h:342
static int SyncRepWaitMode
Definition syncrep.c:99
static void SyncRepQueueInsert(int mode)
Definition syncrep.c:382
static void SyncRepCancelWait(void)
Definition syncrep.c:416
#define SyncRepRequested()
Definition syncrep.h:18
#define SYNC_REP_WAITING
Definition syncrep.h:31
#define SYNC_REP_WAIT_COMPLETE
Definition syncrep.h:32
#define SYNC_REP_NOT_WAITING
Definition syncrep.h:30
#define WL_LATCH_SET
#define WL_POSTMASTER_DEATH
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References Assert, DestNone, dlist_node_is_detached(), ereport, errcode(), errdetail(), errdetail_log(), errmsg, fb(), InterruptHoldoffCount, InvalidXLogRecPtr, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), Min, mode, MyLatch, MyProc, pg_read_barrier, ProcDiePending, ProcDieSenderPid, ProcDieSenderUid, QueryCancelPending, ResetLatch(), set_ps_display_remove_suffix(), set_ps_display_suffix(), sprintf, SYNC_REP_NOT_WAITING, SYNC_REP_WAIT_COMPLETE, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAITING, SYNC_STANDBY_DEFINED, SYNC_STANDBY_INIT, WalSndCtlData::sync_standbys_status, SyncRepCancelWait(), PGPROC::syncRepLinks, SyncRepQueueInsert(), SyncRepRequested, PGPROC::syncRepState, SyncRepWaitMode, SyncStandbysDefined, update_process_title, WaitLatch(), PGPROC::waitLSN, WalSndCtl, WARNING, whereToSendOutput, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by EndPrepare(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

Variable Documentation

◆ SyncRepConfig

◆ SyncRepStandbyNames

PGDLLIMPORT char* SyncRepStandbyNames
extern

Definition at line 91 of file syncrep.c.