PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

Size WalRcvShmemSize (void)
 
void WalRcvShmemInit (void)
 
bool WalRcvRunning (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
 
XLogRecPtr GetWalRcvFlushRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
XLogRecPtr GetWalRcvWriteRecPtr (void)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 

Macro Definition Documentation

◆ WALRCV_STARTUP_TIMEOUT

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 40 of file walreceiverfuncs.c.

Function Documentation

◆ GetReplicationApplyDelay()

int GetReplicationApplyDelay ( void  )

Definition at line 365 of file walreceiverfuncs.c.

366{
367 WalRcvData *walrcv = WalRcv;
368 XLogRecPtr receivePtr;
369 XLogRecPtr replayPtr;
370 TimestampTz chunkReplayStartTime;
371
372 SpinLockAcquire(&walrcv->mutex);
373 receivePtr = walrcv->flushedUpto;
374 SpinLockRelease(&walrcv->mutex);
375
376 replayPtr = GetXLogReplayRecPtr(NULL);
377
378 if (receivePtr == replayPtr)
379 return 0;
380
381 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
382
383 if (chunkReplayStartTime == 0)
384 return -1;
385
386 return TimestampDifferenceMilliseconds(chunkReplayStartTime,
388}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
int64 TimestampTz
Definition: timestamp.h:39
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
XLogRecPtr flushedUpto
Definition: walreceiver.h:96
slock_t mutex
Definition: walreceiver.h:147
WalRcvData * WalRcv
uint64 XLogRecPtr
Definition: xlogdefs.h:21
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References WalRcvData::flushedUpto, GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetReplicationTransferLatency()

int GetReplicationTransferLatency ( void  )

Definition at line 395 of file walreceiverfuncs.c.

396{
397 WalRcvData *walrcv = WalRcv;
398 TimestampTz lastMsgSendTime;
399 TimestampTz lastMsgReceiptTime;
400
401 SpinLockAcquire(&walrcv->mutex);
402 lastMsgSendTime = walrcv->lastMsgSendTime;
403 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
404 SpinLockRelease(&walrcv->mutex);
405
406 return TimestampDifferenceMilliseconds(lastMsgSendTime,
407 lastMsgReceiptTime);
408}
TimestampTz lastMsgReceiptTime
Definition: walreceiver.h:111
TimestampTz lastMsgSendTime
Definition: walreceiver.h:110

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetWalRcvFlushRecPtr()

XLogRecPtr GetWalRcvFlushRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 332 of file walreceiverfuncs.c.

333{
334 WalRcvData *walrcv = WalRcv;
335 XLogRecPtr recptr;
336
337 SpinLockAcquire(&walrcv->mutex);
338 recptr = walrcv->flushedUpto;
339 if (latestChunkStart)
340 *latestChunkStart = walrcv->latestChunkStart;
341 if (receiveTLI)
342 *receiveTLI = walrcv->receivedTLI;
343 SpinLockRelease(&walrcv->mutex);
344
345 return recptr;
346}
TimeLineID receivedTLI
Definition: walreceiver.h:97
XLogRecPtr latestChunkStart
Definition: walreceiver.h:105
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263

References WalRcvData::flushedUpto, WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, receiveTLI, SpinLockAcquire, SpinLockRelease, and WalRcv.

Referenced by CreateRestartPoint(), GetLatestLSN(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), reserve_wal_for_local_slot(), and WaitForWALToBecomeAvailable().

◆ GetWalRcvWriteRecPtr()

XLogRecPtr GetWalRcvWriteRecPtr ( void  )

Definition at line 353 of file walreceiverfuncs.c.

354{
355 WalRcvData *walrcv = WalRcv;
356
357 return pg_atomic_read_u64(&walrcv->writtenUpto);
358}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
pg_atomic_uint64 writtenUpto
Definition: walreceiver.h:155

References pg_atomic_read_u64(), WalRcv, and WalRcvData::writtenUpto.

◆ RequestXLogStreaming()

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo,
const char *  slotname,
bool  create_temp_slot 
)

Definition at line 246 of file walreceiverfuncs.c.

248{
249 WalRcvData *walrcv = WalRcv;
250 bool launch = false;
251 pg_time_t now = (pg_time_t) time(NULL);
252 ProcNumber walrcv_proc;
253
254 /*
255 * We always start at the beginning of the segment. That prevents a broken
256 * segment (i.e., with no records in the first half of a segment) from
257 * being created by XLOG streaming, which might cause trouble later on if
258 * the segment is e.g archived.
259 */
260 if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
261 recptr -= XLogSegmentOffset(recptr, wal_segment_size);
262
263 SpinLockAcquire(&walrcv->mutex);
264
265 /* It better be stopped if we try to restart it */
266 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
267 walrcv->walRcvState == WALRCV_WAITING);
268
269 if (conninfo != NULL)
270 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
271 else
272 walrcv->conninfo[0] = '\0';
273
274 /*
275 * Use configured replication slot if present, and ignore the value of
276 * create_temp_slot as the slot name should be persistent. Otherwise, use
277 * create_temp_slot to determine whether this WAL receiver should create a
278 * temporary slot by itself and use it, or not.
279 */
280 if (slotname != NULL && slotname[0] != '\0')
281 {
282 strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
283 walrcv->is_temp_slot = false;
284 }
285 else
286 {
287 walrcv->slotname[0] = '\0';
288 walrcv->is_temp_slot = create_temp_slot;
289 }
290
291 if (walrcv->walRcvState == WALRCV_STOPPED)
292 {
293 launch = true;
295 }
296 else
298 walrcv->startTime = now;
299
300 /*
301 * If this is the first startup of walreceiver (on this timeline),
302 * initialize flushedUpto and latestChunkStart to the starting point.
303 */
304 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
305 {
306 walrcv->flushedUpto = recptr;
307 walrcv->receivedTLI = tli;
308 walrcv->latestChunkStart = recptr;
309 }
310 walrcv->receiveStart = recptr;
311 walrcv->receiveStartTLI = tli;
312
313 walrcv_proc = walrcv->procno;
314
315 SpinLockRelease(&walrcv->mutex);
316
317 if (launch)
319 else if (walrcv_proc != INVALID_PROC_NUMBER)
320 SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
321}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
#define Assert(condition)
Definition: c.h:812
void SetLatch(Latch *latch)
Definition: latch.c:632
#define NAMEDATALEN
int64 pg_time_t
Definition: pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
@ PMSIGNAL_START_WALRECEIVER
Definition: pmsignal.h:41
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define GetPGProcByNumber(n)
Definition: proc.h:436
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
TimeLineID receiveStartTLI
Definition: walreceiver.h:87
char slotname[NAMEDATALEN]
Definition: walreceiver.h:136
XLogRecPtr receiveStart
Definition: walreceiver.h:86
ProcNumber procno
Definition: walreceiver.h:67
bool is_temp_slot
Definition: walreceiver.h:142
pg_time_t startTime
Definition: walreceiver.h:78
WalRcvState walRcvState
Definition: walreceiver.h:71
char conninfo[MAXCONNINFO]
Definition: walreceiver.h:123
#define MAXCONNINFO
Definition: walreceiver.h:37
@ WALRCV_STARTING
Definition: walreceiver.h:48
@ WALRCV_STOPPED
Definition: walreceiver.h:47
@ WALRCV_RESTARTING
Definition: walreceiver.h:52
@ WALRCV_WAITING
Definition: walreceiver.h:51
int wal_segment_size
Definition: xlog.c:143
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

References Assert, WalRcvData::conninfo, WalRcvData::flushedUpto, GetPGProcByNumber, INVALID_PROC_NUMBER, WalRcvData::is_temp_slot, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, NAMEDATALEN, now(), PMSIGNAL_START_WALRECEIVER, WalRcvData::procno, WalRcvData::receivedTLI, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), WalRcvData::slotname, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable().

◆ ShutdownWalRcv()

void ShutdownWalRcv ( void  )

Definition at line 179 of file walreceiverfuncs.c.

180{
181 WalRcvData *walrcv = WalRcv;
182 pid_t walrcvpid = 0;
183 bool stopped = false;
184
185 /*
186 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187 * mode once it's finished, and will also request postmaster to not
188 * restart itself.
189 */
190 SpinLockAcquire(&walrcv->mutex);
191 switch (walrcv->walRcvState)
192 {
193 case WALRCV_STOPPED:
194 break;
195 case WALRCV_STARTING:
196 walrcv->walRcvState = WALRCV_STOPPED;
197 stopped = true;
198 break;
199
200 case WALRCV_STREAMING:
201 case WALRCV_WAITING:
204 /* fall through */
205 case WALRCV_STOPPING:
206 walrcvpid = walrcv->pid;
207 break;
208 }
209 SpinLockRelease(&walrcv->mutex);
210
211 /* Unnecessary but consistent. */
212 if (stopped)
214
215 /*
216 * Signal walreceiver process if it was still running.
217 */
218 if (walrcvpid != 0)
219 kill(walrcvpid, SIGTERM);
220
221 /*
222 * Wait for walreceiver to acknowledge its death by setting state to
223 * WALRCV_STOPPED.
224 */
226 while (WalRcvRunning())
228 WAIT_EVENT_WAL_RECEIVER_EXIT);
230}
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
pid_t pid
Definition: walreceiver.h:68
ConditionVariable walRcvStoppedCV
Definition: walreceiver.h:72
@ WALRCV_STREAMING
Definition: walreceiver.h:50
@ WALRCV_STOPPING
Definition: walreceiver.h:53
bool WalRcvRunning(void)
#define kill(pid, sig)
Definition: win32_port.h:503

References ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), kill, WalRcvData::mutex, WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by XLogShutdownWalRcv().

◆ WalRcvRunning()

bool WalRcvRunning ( void  )

Definition at line 76 of file walreceiverfuncs.c.

77{
78 WalRcvData *walrcv = WalRcv;
80 pg_time_t startTime;
81
82 SpinLockAcquire(&walrcv->mutex);
83
84 state = walrcv->walRcvState;
85 startTime = walrcv->startTime;
86
87 SpinLockRelease(&walrcv->mutex);
88
89 /*
90 * If it has taken too long for walreceiver to start up, give up. Setting
91 * the state to STOPPED ensures that if walreceiver later does start up
92 * after all, it will see that it's not supposed to be running and die
93 * without doing anything.
94 */
96 {
97 pg_time_t now = (pg_time_t) time(NULL);
98
99 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 {
101 bool stopped = false;
102
103 SpinLockAcquire(&walrcv->mutex);
104 if (walrcv->walRcvState == WALRCV_STARTING)
105 {
106 state = walrcv->walRcvState = WALRCV_STOPPED;
107 stopped = true;
108 }
109 SpinLockRelease(&walrcv->mutex);
110
111 if (stopped)
113 }
114 }
115
116 if (state != WALRCV_STOPPED)
117 return true;
118 else
119 return false;
120}
Definition: regguts.h:323
WalRcvState
Definition: walreceiver.h:46
#define WALRCV_STARTUP_TIMEOUT

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by ShutdownWalRcv(), and StartupRequestWalReceiverRestart().

◆ WalRcvShmemInit()

void WalRcvShmemInit ( void  )

Definition at line 55 of file walreceiverfuncs.c.

56{
57 bool found;
58
59 WalRcv = (WalRcvData *)
60 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61
62 if (!found)
63 {
64 /* First time through, so initialize */
71 }
72}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
#define MemSet(start, val, len)
Definition: c.h:974
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
#define SpinLockInit(lock)
Definition: spin.h:57
Size WalRcvShmemSize(void)

References ConditionVariableInit(), INVALID_PROC_NUMBER, MemSet, WalRcvData::mutex, pg_atomic_init_u64(), WalRcvData::procno, ShmemInitStruct(), SpinLockInit, WalRcv, WALRCV_STOPPED, WalRcvShmemSize(), WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and WalRcvData::writtenUpto.

Referenced by CreateOrAttachShmemStructs().

◆ WalRcvShmemSize()

Size WalRcvShmemSize ( void  )

Definition at line 44 of file walreceiverfuncs.c.

45{
46 Size size = 0;
47
48 size = add_size(size, sizeof(WalRcvData));
49
50 return size;
51}
size_t Size
Definition: c.h:559
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), and size.

Referenced by CalculateShmemSize(), and WalRcvShmemInit().

◆ WalRcvStreaming()

bool WalRcvStreaming ( void  )

Definition at line 127 of file walreceiverfuncs.c.

128{
129 WalRcvData *walrcv = WalRcv;
131 pg_time_t startTime;
132
133 SpinLockAcquire(&walrcv->mutex);
134
135 state = walrcv->walRcvState;
136 startTime = walrcv->startTime;
137
138 SpinLockRelease(&walrcv->mutex);
139
140 /*
141 * If it has taken too long for walreceiver to start up, give up. Setting
142 * the state to STOPPED ensures that if walreceiver later does start up
143 * after all, it will see that it's not supposed to be running and die
144 * without doing anything.
145 */
146 if (state == WALRCV_STARTING)
147 {
148 pg_time_t now = (pg_time_t) time(NULL);
149
150 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
151 {
152 bool stopped = false;
153
154 SpinLockAcquire(&walrcv->mutex);
155 if (walrcv->walRcvState == WALRCV_STARTING)
156 {
157 state = walrcv->walRcvState = WALRCV_STOPPED;
158 stopped = true;
159 }
160 SpinLockRelease(&walrcv->mutex);
161
162 if (stopped)
164 }
165 }
166
169 return true;
170 else
171 return false;
172}

References ConditionVariableBroadcast(), WalRcvData::mutex, now(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, WALRCV_STREAMING, WalRcvData::walRcvState, and WalRcvData::walRcvStoppedCV.

Referenced by FinishWalRecovery(), and WaitForWALToBecomeAvailable().

Variable Documentation

◆ WalRcv