92 #define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
121 static int cmp_lsn(
const void *
a,
const void *
b);
123 #ifdef USE_ASSERT_CHECKING
124 static bool SyncRepQueueIsOrderedByLSN(
int mode);
150 char *new_status = NULL;
151 const char *old_status;
199 lsn <= WalSndCtl->lsn[
mode])
221 new_status = (
char *)
palloc(
len + 32 + 1);
222 memcpy(new_status, old_status,
len);
223 sprintf(new_status +
len,
" waiting for %X/%X",
226 new_status[
len] =
'\0';
267 (
errcode(ERRCODE_ADMIN_SHUTDOWN),
268 errmsg(
"canceling the wait for synchronous replication and terminating connection due to administrator command"),
269 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
285 (
errmsg(
"canceling wait for synchronous replication due to user request"),
286 errdetail(
"The transaction has already committed locally, but might not have been replicated to the standby.")));
427 (
errmsg_internal(
"standby \"%s\" now has synchronous standby priority %u",
494 (
errmsg(
"standby \"%s\" is now a synchronous standby with priority %u",
498 (
errmsg(
"standby \"%s\" is now a candidate for quorum synchronous standby",
506 if (!got_recptr || !am_sync)
535 elog(
DEBUG3,
"released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
573 for (
i = 0;
i < num_standbys;
i++)
575 if (sync_standbys[
i].is_me)
587 num_standbys < SyncRepConfig->num_sync)
589 pfree(sync_standbys);
609 sync_standbys, num_standbys);
614 sync_standbys, num_standbys,
618 pfree(sync_standbys);
639 for (
i = 0;
i < num_standbys;
i++)
672 Assert(nth > 0 && nth <= num_standbys);
678 for (
i = 0;
i < num_standbys;
i++)
680 write_array[
i] = sync_standbys[
i].
write;
681 flush_array[
i] = sync_standbys[
i].
flush;
682 apply_array[
i] = sync_standbys[
i].
apply;
691 *writePtr = write_array[nth - 1];
692 *flushPtr = flush_array[nth - 1];
693 *applyPtr = apply_array[nth - 1];
711 else if (lsn1 == lsn2)
748 stby = *standbys + n;
833 const char *standby_name;
851 strcmp(standby_name,
"*") == 0)
856 standby_name += strlen(standby_name) + 1;
960 if (!sync_standbys_defined)
981 #ifdef USE_ASSERT_CHECKING
983 SyncRepQueueIsOrderedByLSN(
int mode)
1051 GUC_check_errmsg(
"number of synchronous standbys (%d) must be greater than zero",
1063 *extra = (
void *) pconf;
#define pg_read_barrier()
#define pg_write_barrier()
#define offsetof(type, field)
elog(ERROR, "%s: %s", p2, msg)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 InterruptHoldoffCount
volatile sig_atomic_t QueryCancelPending
volatile sig_atomic_t ProcDiePending
void GUC_check_errcode(int sqlerrcode)
#define GUC_check_errdetail
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
void pfree(void *pointer)
static PgChecksumMode mode
static rewind_source * source
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
CommandDest whereToSendOutput
const char * get_ps_display(int *displen)
bool update_process_title
void set_ps_display(const char *activity)
Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
void SHMQueueDelete(SHM_QUEUE *queue)
void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset)
bool SHMQueueIsDetached(const SHM_QUEUE *queue)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char member_names[FLEXIBLE_ARRAY_MEMBER]
int sync_standby_priority
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
bool sync_standbys_defined
int sync_standby_priority
static int SyncRepWaitMode
void SyncRepInitConfig(void)
void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync)
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys, uint8 nth)
void assign_synchronous_commit(int newval, void *extra)
void assign_synchronous_standby_names(const char *newval, void *extra)
static int standby_priority_comparator(const void *a, const void *b)
static int SyncRepWakeQueue(bool all, int mode)
SyncRepConfigData * SyncRepConfig
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
void SyncRepReleaseWaiters(void)
void SyncRepUpdateSyncStandbysDefined(void)
static bool announce_next_takeover
static int SyncRepGetStandbyPriority(void)
char * SyncRepStandbyNames
static void SyncRepQueueInsert(int mode)
static void SyncRepCancelWait(void)
bool check_synchronous_standby_names(char **newval, void **extra, GucSource source)
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, SyncRepStandbyData *sync_standbys, int num_standbys)
void SyncRepCleanupAtProcExit(void)
static int cmp_lsn(const void *a, const void *b)
#define SyncStandbysDefined()
#define SYNC_REP_PRIORITY
#define NUM_SYNC_REP_WAIT_MODE
#define SyncRepRequested()
void syncrep_scanner_finish(void)
void syncrep_scanner_init(const char *query_string)
#define SYNC_REP_WAIT_WRITE
int syncrep_yyparse(void)
PGDLLIMPORT SyncRepConfigData * syncrep_parse_result
#define SYNC_REP_WAIT_COMPLETE
#define SYNC_REP_WAIT_FLUSH
PGDLLIMPORT char * syncrep_parse_error_msg
#define SYNC_REP_NOT_WAITING
#define SYNC_REP_WAIT_APPLY
bool am_cascading_walsender
WalSndCtlData * WalSndCtl
@ SYNCHRONOUS_COMMIT_REMOTE_WRITE
@ SYNCHRONOUS_COMMIT_REMOTE_APPLY
@ SYNCHRONOUS_COMMIT_REMOTE_FLUSH
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr