92#define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
121static int cmp_lsn(
const void *
a,
const void *
b);
123#ifdef USE_ASSERT_CHECKING
124static bool SyncRepQueueIsOrderedByLSN(
int mode);
210 lsn <= WalSndCtl->lsn[
mode])
216 else if (lsn <= WalSndCtl->lsn[
mode])
303 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
304 errmsg(
"canceling the wait for synchronous replication and terminating connection due to administrator command"),
305 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
321 (
errmsg(
"canceling wait for synchronous replication due to user request"),
322 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
332 WAIT_EVENT_SYNC_REP);
461 (
errmsg_internal(
"standby \"%s\" now has synchronous standby priority %d",
528 (
errmsg(
"standby \"%s\" is now a synchronous standby with priority %d",
532 (
errmsg(
"standby \"%s\" is now a candidate for quorum synchronous standby",
540 if (!got_recptr || !am_sync)
569 elog(
DEBUG3,
"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
607 for (
i = 0;
i < num_standbys;
i++)
609 if (sync_standbys[
i].is_me)
621 num_standbys < SyncRepConfig->num_sync)
623 pfree(sync_standbys);
643 sync_standbys, num_standbys);
648 sync_standbys, num_standbys,
652 pfree(sync_standbys);
673 for (
i = 0;
i < num_standbys;
i++)
706 Assert(nth > 0 && nth <= num_standbys);
712 for (
i = 0;
i < num_standbys;
i++)
714 write_array[
i] = sync_standbys[
i].
write;
715 flush_array[
i] = sync_standbys[
i].
flush;
716 apply_array[
i] = sync_standbys[
i].
apply;
725 *writePtr = write_array[nth - 1];
726 *flushPtr = flush_array[nth - 1];
727 *applyPtr = apply_array[nth - 1];
777 stby = *standbys + n;
862 const char *standby_name;
880 strcmp(standby_name,
"*") == 0)
885 standby_name += strlen(standby_name) + 1;
968 if (sync_standbys_defined !=
978 if (!sync_standbys_defined)
1022#ifdef USE_ASSERT_CHECKING
1024SyncRepQueueIsOrderedByLSN(
int mode)
1068 char *syncrep_parse_error_msg = NULL;
1072 parse_rc =
syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
1075 if (parse_rc != 0 || syncrep_parse_result == NULL)
1078 if (syncrep_parse_error_msg)
1082 "synchronous_standby_names");
1086 if (syncrep_parse_result->
num_sync <= 0)
1088 GUC_check_errmsg(
"number of synchronous standbys (%d) must be greater than zero",
1098 memcpy(pconf, syncrep_parse_result, syncrep_parse_result->
config_size);
#define pg_read_barrier()
#define pg_write_barrier()
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 InterruptHoldoffCount
volatile sig_atomic_t QueryCancelPending
volatile sig_atomic_t ProcDiePending
void GUC_check_errcode(int sqlerrcode)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
Assert(PointerIsAligned(start, uint64))
static void dlist_insert_after(dlist_node *after, dlist_node *node)
#define dlist_foreach(iter, lhead)
static void dlist_delete_thoroughly(dlist_node *node)
static bool dlist_node_is_detached(const dlist_node *node)
#define dlist_reverse_foreach(iter, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
#define dlist_foreach_modify(iter, lhead)
#define dlist_container(type, membername, ptr)
static int pg_cmp_u64(uint64 a, uint64 b)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
static PgChecksumMode mode
static rewind_source * source
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
CommandDest whereToSendOutput
void set_ps_display_remove_suffix(void)
void set_ps_display_suffix(const char *suffix)
bool update_process_title
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char member_names[FLEXIBLE_ARRAY_MEMBER]
int sync_standby_priority
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]
bits8 sync_standbys_status
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int sync_standby_priority
static int SyncRepWaitMode
void SyncRepInitConfig(void)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
void assign_synchronous_commit(int newval, void *extra)
void assign_synchronous_standby_names(const char *newval, void *extra)
static int standby_priority_comparator(const void *a, const void *b)
static int SyncRepWakeQueue(bool all, int mode)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
void SyncRepUpdateSyncStandbysDefined(void)
static bool announce_next_takeover
static int SyncRepGetStandbyPriority(void)
char * SyncRepStandbyNames
static void SyncRepQueueInsert(int mode)
static void SyncRepCancelWait(void)
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
void SyncRepCleanupAtProcExit(void)
static int cmp_lsn(const void *a, const void *b)
#define SyncStandbysDefined()
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
#define SYNC_REP_WAIT_WRITE
#define SYNC_REP_WAIT_COMPLETE
#define SYNC_REP_WAIT_FLUSH
#define SYNC_REP_NOT_WAITING
int syncrep_yyparse(SyncRepConfigData **syncrep_parse_result_p, char **syncrep_parse_error_msg_p, yyscan_t yyscanner)
#define SYNC_REP_WAIT_APPLY
void syncrep_scanner_finish(yyscan_t yyscanner)
void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp)
#define WL_POSTMASTER_DEATH
bool am_cascading_walsender
WalSndCtlData * WalSndCtl
#define SYNC_STANDBY_DEFINED
#define SYNC_STANDBY_INIT
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr