92#define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
121static int cmp_lsn(
const void *
a,
const void *
b);
123#ifdef USE_ASSERT_CHECKING
124static bool SyncRepQueueIsOrderedByLSN(
int mode);
197 lsn <= WalSndCtl->lsn[
mode])
260 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
261 errmsg(
"canceling the wait for synchronous replication and terminating connection due to administrator command"),
262 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
278 (
errmsg(
"canceling wait for synchronous replication due to user request"),
279 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
289 WAIT_EVENT_SYNC_REP);
418 (
errmsg_internal(
"standby \"%s\" now has synchronous standby priority %d",
485 (
errmsg(
"standby \"%s\" is now a synchronous standby with priority %d",
489 (
errmsg(
"standby \"%s\" is now a candidate for quorum synchronous standby",
497 if (!got_recptr || !am_sync)
526 elog(
DEBUG3,
"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
564 for (
i = 0;
i < num_standbys;
i++)
566 if (sync_standbys[
i].is_me)
578 num_standbys < SyncRepConfig->num_sync)
580 pfree(sync_standbys);
600 sync_standbys, num_standbys);
605 sync_standbys, num_standbys,
609 pfree(sync_standbys);
630 for (
i = 0;
i < num_standbys;
i++)
663 Assert(nth > 0 && nth <= num_standbys);
669 for (
i = 0;
i < num_standbys;
i++)
671 write_array[
i] = sync_standbys[
i].
write;
672 flush_array[
i] = sync_standbys[
i].
flush;
673 apply_array[
i] = sync_standbys[
i].
apply;
682 *writePtr = write_array[nth - 1];
683 *flushPtr = flush_array[nth - 1];
684 *applyPtr = apply_array[nth - 1];
734 stby = *standbys + n;
819 const char *standby_name;
837 strcmp(standby_name,
"*") == 0)
842 standby_name += strlen(standby_name) + 1;
934 if (!sync_standbys_defined)
955#ifdef USE_ASSERT_CHECKING
957SyncRepQueueIsOrderedByLSN(
int mode)
1015 "synchronous_standby_names");
1021 GUC_check_errmsg(
"number of synchronous standbys (%d) must be greater than zero",
#define pg_read_barrier()
#define pg_write_barrier()
#define Assert(condition)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 InterruptHoldoffCount
volatile sig_atomic_t QueryCancelPending
volatile sig_atomic_t ProcDiePending
void GUC_check_errcode(int sqlerrcode)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
static void dlist_insert_after(dlist_node *after, dlist_node *node)
#define dlist_foreach(iter, lhead)
static void dlist_delete_thoroughly(dlist_node *node)
static bool dlist_node_is_detached(const dlist_node *node)
#define dlist_reverse_foreach(iter, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
#define dlist_foreach_modify(iter, lhead)
#define dlist_container(type, membername, ptr)
static int pg_cmp_u64(uint64 a, uint64 b)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
static PgChecksumMode mode
static rewind_source * source
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
CommandDest whereToSendOutput
void set_ps_display_remove_suffix(void)
void set_ps_display_suffix(const char *suffix)
bool update_process_title
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char member_names[FLEXIBLE_ARRAY_MEMBER]
int sync_standby_priority
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
bool sync_standbys_defined
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int sync_standby_priority
static int SyncRepWaitMode
void SyncRepInitConfig(void)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
void assign_synchronous_commit(int newval, void *extra)
void assign_synchronous_standby_names(const char *newval, void *extra)
static int standby_priority_comparator(const void *a, const void *b)
static int SyncRepWakeQueue(bool all, int mode)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
void SyncRepUpdateSyncStandbysDefined(void)
static bool announce_next_takeover
static int SyncRepGetStandbyPriority(void)
char * SyncRepStandbyNames
static void SyncRepQueueInsert(int mode)
static void SyncRepCancelWait(void)
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
void SyncRepCleanupAtProcExit(void)
static int cmp_lsn(const void *a, const void *b)
#define SyncStandbysDefined()
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
PGDLLIMPORT SyncRepConfigData * syncrep_parse_result
#define SYNC_REP_WAIT_COMPLETE
#define SYNC_REP_WAIT_FLUSH
PGDLLIMPORT char * syncrep_parse_error_msg
#define SYNC_REP_NOT_WAITING
#define SYNC_REP_WAIT_APPLY
int syncrep_yyparse(yyscan_t yyscanner)
void syncrep_scanner_finish(yyscan_t yyscanner)
void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp)
bool am_cascading_walsender
WalSndCtlData * WalSndCtl
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr