93 #define SyncStandbysDefined() \
94 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
122 static int cmp_lsn(
const void *
a,
const void *
b);
124 #ifdef USE_ASSERT_CHECKING
125 static bool SyncRepQueueIsOrderedByLSN(
int mode);
151 char *new_status = NULL;
152 const char *old_status;
200 lsn <= WalSndCtl->lsn[
mode])
222 new_status = (
char *)
palloc(
len + 32 + 1);
223 memcpy(new_status, old_status,
len);
224 sprintf(new_status +
len,
" waiting for %X/%X",
227 new_status[
len] =
'\0';
268 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
269 errmsg(
"canceling the wait for synchronous replication and terminating connection due to administrator command"),
270 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
286 (
errmsg(
"canceling wait for synchronous replication due to user request"),
287 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
429 (
errmsg_internal(
"standby \"%s\" now has synchronous standby priority %u",
496 (
errmsg(
"standby \"%s\" is now a synchronous standby with priority %u",
500 (
errmsg(
"standby \"%s\" is now a candidate for quorum synchronous standby",
508 if (!got_recptr || !am_sync)
537 elog(
DEBUG3,
"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
575 for (
i = 0;
i < num_standbys;
i++)
577 if (sync_standbys[
i].is_me)
589 num_standbys < SyncRepConfig->num_sync)
591 pfree(sync_standbys);
611 sync_standbys, num_standbys);
616 sync_standbys, num_standbys,
620 pfree(sync_standbys);
641 for (
i = 0;
i < num_standbys;
i++)
674 Assert(nth > 0 && nth <= num_standbys);
680 for (
i = 0;
i < num_standbys;
i++)
682 write_array[
i] = sync_standbys[
i].
write;
683 flush_array[
i] = sync_standbys[
i].
flush;
684 apply_array[
i] = sync_standbys[
i].
apply;
693 *writePtr = write_array[nth - 1];
694 *flushPtr = flush_array[nth - 1];
695 *applyPtr = apply_array[nth - 1];
713 else if (lsn1 == lsn2)
750 stby = *standbys + n;
835 const char *standby_name;
853 strcmp(standby_name,
"*") == 0)
858 standby_name += strlen(standby_name) + 1;
950 if (!sync_standbys_defined)
971 #ifdef USE_ASSERT_CHECKING
973 SyncRepQueueIsOrderedByLSN(
int mode)
1035 GUC_check_errmsg(
"number of synchronous standbys (%d) must be greater than zero",
1047 *extra = (
void *) pconf;
#define pg_read_barrier()
#define pg_write_barrier()
elog(ERROR, "%s: %s", p2, msg)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 InterruptHoldoffCount
volatile sig_atomic_t QueryCancelPending
volatile sig_atomic_t ProcDiePending
void GUC_check_errcode(int sqlerrcode)
void * guc_malloc(int elevel, size_t size)
#define GUC_check_errdetail
static void dlist_insert_after(dlist_node *after, dlist_node *node)
#define dlist_foreach(iter, lhead)
static void dlist_delete_thoroughly(dlist_node *node)
static bool dlist_node_is_detached(const dlist_node *node)
#define dlist_reverse_foreach(iter, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
#define dlist_foreach_modify(iter, lhead)
#define dlist_container(type, membername, ptr)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void pfree(void *pointer)
static PgChecksumMode mode
static rewind_source * source
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
CommandDest whereToSendOutput
const char * get_ps_display(int *displen)
bool update_process_title
void set_ps_display(const char *activity)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char member_names[FLEXIBLE_ARRAY_MEMBER]
int sync_standby_priority
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
bool sync_standbys_defined
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int sync_standby_priority
static int SyncRepWaitMode
void SyncRepInitConfig(void)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
void assign_synchronous_commit(int newval, void *extra)
void assign_synchronous_standby_names(const char *newval, void *extra)
static int standby_priority_comparator(const void *a, const void *b)
static int SyncRepWakeQueue(bool all, int mode)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
void SyncRepUpdateSyncStandbysDefined(void)
static bool announce_next_takeover
static int SyncRepGetStandbyPriority(void)
char * SyncRepStandbyNames
static void SyncRepQueueInsert(int mode)
static void SyncRepCancelWait(void)
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
void SyncRepCleanupAtProcExit(void)
static int cmp_lsn(const void *a, const void *b)
#define SyncStandbysDefined()
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
void syncrep_scanner_init(const char *str)
#define SyncRepRequested()
void syncrep_scanner_finish(void)
#define SYNC_REP_WAIT_WRITE
int syncrep_yyparse(void)
PGDLLIMPORT SyncRepConfigData * syncrep_parse_result
#define SYNC_REP_WAIT_COMPLETE
#define SYNC_REP_WAIT_FLUSH
PGDLLIMPORT char * syncrep_parse_error_msg
#define SYNC_REP_NOT_WAITING
#define SYNC_REP_WAIT_APPLY
bool am_cascading_walsender
WalSndCtlData * WalSndCtl
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr