93 #define SyncStandbysDefined() \
94 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
122 static int cmp_lsn(
const void *
a,
const void *
b);
124 #ifdef USE_ASSERT_CHECKING
125 static bool SyncRepQueueIsOrderedByLSN(
int mode);
198 lsn <= WalSndCtl->lsn[
mode])
261 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
262 errmsg(
"canceling the wait for synchronous replication and terminating connection due to administrator command"),
263 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
279 (
errmsg(
"canceling wait for synchronous replication due to user request"),
280 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
419 (
errmsg_internal(
"standby \"%s\" now has synchronous standby priority %u",
486 (
errmsg(
"standby \"%s\" is now a synchronous standby with priority %u",
490 (
errmsg(
"standby \"%s\" is now a candidate for quorum synchronous standby",
498 if (!got_recptr || !am_sync)
527 elog(
DEBUG3,
"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
565 for (
i = 0;
i < num_standbys;
i++)
567 if (sync_standbys[
i].is_me)
579 num_standbys < SyncRepConfig->num_sync)
581 pfree(sync_standbys);
601 sync_standbys, num_standbys);
606 sync_standbys, num_standbys,
610 pfree(sync_standbys);
631 for (
i = 0;
i < num_standbys;
i++)
664 Assert(nth > 0 && nth <= num_standbys);
670 for (
i = 0;
i < num_standbys;
i++)
672 write_array[
i] = sync_standbys[
i].
write;
673 flush_array[
i] = sync_standbys[
i].
flush;
674 apply_array[
i] = sync_standbys[
i].
apply;
683 *writePtr = write_array[nth - 1];
684 *flushPtr = flush_array[nth - 1];
685 *applyPtr = apply_array[nth - 1];
703 else if (lsn1 == lsn2)
740 stby = *standbys + n;
825 const char *standby_name;
843 strcmp(standby_name,
"*") == 0)
848 standby_name += strlen(standby_name) + 1;
940 if (!sync_standbys_defined)
961 #ifdef USE_ASSERT_CHECKING
963 SyncRepQueueIsOrderedByLSN(
int mode)
1025 GUC_check_errmsg(
"number of synchronous standbys (%d) must be greater than zero",
1037 *extra = (
void *) pconf;
#define pg_read_barrier()
#define pg_write_barrier()
elog(ERROR, "%s: %s", p2, msg)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 InterruptHoldoffCount
volatile sig_atomic_t QueryCancelPending
volatile sig_atomic_t ProcDiePending
void GUC_check_errcode(int sqlerrcode)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
static void dlist_insert_after(dlist_node *after, dlist_node *node)
#define dlist_foreach(iter, lhead)
static void dlist_delete_thoroughly(dlist_node *node)
static bool dlist_node_is_detached(const dlist_node *node)
#define dlist_reverse_foreach(iter, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
#define dlist_foreach_modify(iter, lhead)
#define dlist_container(type, membername, ptr)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
static PgChecksumMode mode
static rewind_source * source
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
CommandDest whereToSendOutput
void set_ps_display_remove_suffix(void)
void set_ps_display_suffix(const char *suffix)
bool update_process_title
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char member_names[FLEXIBLE_ARRAY_MEMBER]
int sync_standby_priority
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
bool sync_standbys_defined
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int sync_standby_priority
static int SyncRepWaitMode
void SyncRepInitConfig(void)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
void assign_synchronous_commit(int newval, void *extra)
void assign_synchronous_standby_names(const char *newval, void *extra)
static int standby_priority_comparator(const void *a, const void *b)
static int SyncRepWakeQueue(bool all, int mode)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
void SyncRepUpdateSyncStandbysDefined(void)
static bool announce_next_takeover
static int SyncRepGetStandbyPriority(void)
char * SyncRepStandbyNames
static void SyncRepQueueInsert(int mode)
static void SyncRepCancelWait(void)
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
void SyncRepCleanupAtProcExit(void)
static int cmp_lsn(const void *a, const void *b)
#define SyncStandbysDefined()
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
void syncrep_scanner_init(const char *str)
#define SyncRepRequested()
void syncrep_scanner_finish(void)
#define SYNC_REP_WAIT_WRITE
int syncrep_yyparse(void)
PGDLLIMPORT SyncRepConfigData * syncrep_parse_result
#define SYNC_REP_WAIT_COMPLETE
#define SYNC_REP_WAIT_FLUSH
PGDLLIMPORT char * syncrep_parse_error_msg
#define SYNC_REP_NOT_WAITING
#define SYNC_REP_WAIT_APPLY
bool am_cascading_walsender
WalSndCtlData * WalSndCtl
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr