PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiverfuncs.c File Reference
#include "postgres.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "pgstat.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/subsystems.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Macros

#define WALRCV_STARTUP_TIMEOUT   10
 

Functions

static void WalRcvShmemRequest (void *arg)
 
static void WalRcvShmemInit (void *arg)
 
bool WalRcvRunning (void)
 
WalRcvState WalRcvGetState (void)
 
bool WalRcvStreaming (void)
 
void ShutdownWalRcv (void)
 
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
 
XLogRecPtr GetWalRcvFlushRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 
XLogRecPtr GetWalRcvWriteRecPtr (void)
 
int GetReplicationApplyDelay (void)
 
int GetReplicationTransferLatency (void)
 

Variables

WalRcvDataWalRcv = NULL
 
const ShmemCallbacks WalRcvShmemCallbacks
 

Macro Definition Documentation

◆ WALRCV_STARTUP_TIMEOUT

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 50 of file walreceiverfuncs.c.

Function Documentation

◆ GetReplicationApplyDelay()

int GetReplicationApplyDelay ( void  )

Definition at line 381 of file walreceiverfuncs.c.

382{
387
388 SpinLockAcquire(&walrcv->mutex);
389 receivePtr = walrcv->flushedUpto;
390 SpinLockRelease(&walrcv->mutex);
391
393
394 if (receivePtr == replayPtr)
395 return 0;
396
398
399 if (chunkReplayStartTime == 0)
400 return -1;
401
404}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
int64 TimestampTz
Definition timestamp.h:39
static int fb(int x)
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalRcvData * WalRcv
uint64 XLogRecPtr
Definition xlogdefs.h:21
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References fb(), GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), SpinLockAcquire(), SpinLockRelease(), TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetReplicationTransferLatency()

int GetReplicationTransferLatency ( void  )

Definition at line 411 of file walreceiverfuncs.c.

412{
414 TimestampTz lastMsgSendTime;
415 TimestampTz lastMsgReceiptTime;
416
417 SpinLockAcquire(&walrcv->mutex);
418 lastMsgSendTime = walrcv->lastMsgSendTime;
419 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 SpinLockRelease(&walrcv->mutex);
421
422 return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 lastMsgReceiptTime);
424}

References fb(), SpinLockAcquire(), SpinLockRelease(), TimestampDifferenceMilliseconds(), and WalRcv.

Referenced by ProcessWalSndrMessage().

◆ GetWalRcvFlushRecPtr()

XLogRecPtr GetWalRcvFlushRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 348 of file walreceiverfuncs.c.

349{
352
353 SpinLockAcquire(&walrcv->mutex);
354 recptr = walrcv->flushedUpto;
355 if (latestChunkStart)
356 *latestChunkStart = walrcv->latestChunkStart;
357 if (receiveTLI)
358 *receiveTLI = walrcv->receivedTLI;
359 SpinLockRelease(&walrcv->mutex);
360
361 return recptr;
362}
static TimeLineID receiveTLI

References fb(), receiveTLI, SpinLockAcquire(), SpinLockRelease(), and WalRcv.

Referenced by CreateRestartPoint(), GetCurrentLSNForWaitType(), GetLatestLSN(), GetStandbyFlushRecPtr(), pg_last_wal_receive_lsn(), and WaitForWALToBecomeAvailable().

◆ GetWalRcvWriteRecPtr()

XLogRecPtr GetWalRcvWriteRecPtr ( void  )

Definition at line 369 of file walreceiverfuncs.c.

370{
372
373 return pg_atomic_read_u64(&walrcv->writtenUpto);
374}
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467

References fb(), pg_atomic_read_u64(), and WalRcv.

Referenced by GetCurrentLSNForWaitType().

◆ RequestXLogStreaming()

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char conninfo,
const char slotname,
bool  create_temp_slot 
)

Definition at line 261 of file walreceiverfuncs.c.

263{
265 bool launch = false;
266 pg_time_t now = (pg_time_t) time(NULL);
268
269 /*
270 * We always start at the beginning of the segment. That prevents a broken
271 * segment (i.e., with no records in the first half of a segment) from
272 * being created by XLOG streaming, which might cause trouble later on if
273 * the segment is e.g archived.
274 */
277
278 SpinLockAcquire(&walrcv->mutex);
279
280 /* It better be stopped if we try to restart it */
281 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
282 walrcv->walRcvState == WALRCV_WAITING);
283
284 if (conninfo != NULL)
285 strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
286 else
287 walrcv->conninfo[0] = '\0';
288
289 /*
290 * Use configured replication slot if present, and ignore the value of
291 * create_temp_slot as the slot name should be persistent. Otherwise, use
292 * create_temp_slot to determine whether this WAL receiver should create a
293 * temporary slot by itself and use it, or not.
294 */
295 if (slotname != NULL && slotname[0] != '\0')
296 {
297 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
298 walrcv->is_temp_slot = false;
299 }
300 else
301 {
302 walrcv->slotname[0] = '\0';
303 walrcv->is_temp_slot = create_temp_slot;
304 }
305
306 if (walrcv->walRcvState == WALRCV_STOPPED)
307 {
308 launch = true;
309 walrcv->walRcvState = WALRCV_STARTING;
310 }
311 else
312 walrcv->walRcvState = WALRCV_RESTARTING;
313 walrcv->startTime = now;
314
315 /*
316 * If this is the first startup of walreceiver (on this timeline),
317 * initialize flushedUpto and latestChunkStart to the starting point.
318 */
319 if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
320 {
321 walrcv->flushedUpto = recptr;
322 walrcv->receivedTLI = tli;
323 walrcv->latestChunkStart = recptr;
324 pg_atomic_write_u64(&walrcv->writtenUpto, recptr);
325 }
326 walrcv->receiveStart = recptr;
327 walrcv->receiveStartTLI = tli;
328
329 walrcv_proc = walrcv->procno;
330
331 SpinLockRelease(&walrcv->mutex);
332
333 if (launch)
337}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:485
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define Assert(condition)
Definition c.h:943
void SetLatch(Latch *latch)
Definition latch.c:290
#define NAMEDATALEN
int64 pg_time_t
Definition pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
@ PMSIGNAL_START_WALRECEIVER
Definition pmsignal.h:43
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
#define GetPGProcByNumber(n)
Definition proc.h:504
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
#define MAXCONNINFO
Definition walreceiver.h:37
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_WAITING
Definition walreceiver.h:52
int wal_segment_size
Definition xlog.c:150
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References Assert, fb(), GetPGProcByNumber, INVALID_PROC_NUMBER, MAXCONNINFO, NAMEDATALEN, now(), pg_atomic_write_u64(), PMSIGNAL_START_WALRECEIVER, SendPostmasterSignal(), SetLatch(), SpinLockAcquire(), SpinLockRelease(), strlcpy(), wal_segment_size, WalRcv, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_WAITING, XLogRecPtrIsValid, and XLogSegmentOffset.

Referenced by WaitForWALToBecomeAvailable().

◆ ShutdownWalRcv()

void ShutdownWalRcv ( void  )

Definition at line 193 of file walreceiverfuncs.c.

194{
196 pid_t walrcvpid = 0;
197 bool stopped = false;
198
199 /*
200 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 * mode once it's finished, and will also request postmaster to not
202 * restart itself.
203 */
204 SpinLockAcquire(&walrcv->mutex);
205 switch (walrcv->walRcvState)
206 {
207 case WALRCV_STOPPED:
208 break;
209 case WALRCV_STARTING:
210 walrcv->walRcvState = WALRCV_STOPPED;
211 stopped = true;
212 break;
213
215 case WALRCV_STREAMING:
216 case WALRCV_WAITING:
218 walrcv->walRcvState = WALRCV_STOPPING;
220 case WALRCV_STOPPING:
221 walrcvpid = walrcv->pid;
222 break;
223 }
224 SpinLockRelease(&walrcv->mutex);
225
226 /* Unnecessary but consistent. */
227 if (stopped)
228 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
229
230 /*
231 * Signal walreceiver process if it was still running.
232 */
233 if (walrcvpid != 0)
235
236 /*
237 * Wait for walreceiver to acknowledge its death by setting state to
238 * WALRCV_STOPPED.
239 */
240 ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
241 while (WalRcvRunning())
242 ConditionVariableSleep(&walrcv->walRcvStoppedCV,
245}
#define pg_fallthrough
Definition c.h:161
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_STOPPING
Definition walreceiver.h:54
bool WalRcvRunning(void)
#define kill(pid, sig)
Definition win32_port.h:490

References ConditionVariableBroadcast(), ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), ConditionVariableSleep(), fb(), kill, pg_fallthrough, SpinLockAcquire(), SpinLockRelease(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, and WalRcvRunning().

Referenced by XLogShutdownWalRcv().

◆ WalRcvGetState()

WalRcvState WalRcvGetState ( void  )

Definition at line 124 of file walreceiverfuncs.c.

125{
128
129 SpinLockAcquire(&walrcv->mutex);
130 state = walrcv->walRcvState;
131 SpinLockRelease(&walrcv->mutex);
132
133 return state;
134}
WalRcvState
Definition walreceiver.h:46

References fb(), SpinLockAcquire(), SpinLockRelease(), and WalRcv.

Referenced by WaitForWALToBecomeAvailable().

◆ WalRcvRunning()

bool WalRcvRunning ( void  )

Definition at line 76 of file walreceiverfuncs.c.

77{
80 pg_time_t startTime;
81
82 SpinLockAcquire(&walrcv->mutex);
83
84 state = walrcv->walRcvState;
85 startTime = walrcv->startTime;
86
87 SpinLockRelease(&walrcv->mutex);
88
89 /*
90 * If it has taken too long for walreceiver to start up, give up. Setting
91 * the state to STOPPED ensures that if walreceiver later does start up
92 * after all, it will see that it's not supposed to be running and die
93 * without doing anything.
94 */
96 {
97 pg_time_t now = (pg_time_t) time(NULL);
98
99 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 {
101 bool stopped = false;
102
103 SpinLockAcquire(&walrcv->mutex);
104 if (walrcv->walRcvState == WALRCV_STARTING)
105 {
106 state = walrcv->walRcvState = WALRCV_STOPPED;
107 stopped = true;
108 }
109 SpinLockRelease(&walrcv->mutex);
110
111 if (stopped)
112 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 }
114 }
115
116 if (state != WALRCV_STOPPED)
117 return true;
118 else
119 return false;
120}
#define WALRCV_STARTUP_TIMEOUT

References ConditionVariableBroadcast(), fb(), now(), SpinLockAcquire(), SpinLockRelease(), WalRcv, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, and WALRCV_STOPPED.

Referenced by ShutdownWalRcv(), and StartupRequestWalReceiverRestart().

◆ WalRcvShmemInit()

static void WalRcvShmemInit ( void arg)
static

Definition at line 64 of file walreceiverfuncs.c.

65{
66 MemSet(WalRcv, 0, sizeof(WalRcvData));
72}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
#define MemSet(start, val, len)
Definition c.h:1107
void ConditionVariableInit(ConditionVariable *cv)
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ProcNumber procno
Definition walreceiver.h:68
ConditionVariable walRcvStoppedCV
Definition walreceiver.h:73
pg_atomic_uint64 writtenUpto
WalRcvState walRcvState
Definition walreceiver.h:72
slock_t mutex

References ConditionVariableInit(), INVALID_PROC_NUMBER, MemSet, WalRcvData::mutex, pg_atomic_init_u64(), WalRcvData::procno, SpinLockInit(), WalRcv, WALRCV_STOPPED, WalRcvData::walRcvState, WalRcvData::walRcvStoppedCV, and WalRcvData::writtenUpto.

◆ WalRcvShmemRequest()

static void WalRcvShmemRequest ( void arg)
static

Definition at line 54 of file walreceiverfuncs.c.

55{
56 ShmemRequestStruct(.name = "Wal Receiver Ctl",
57 .size = sizeof(WalRcvData),
58 .ptr = (void **) &WalRcv,
59 );
60}
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References name, ShmemRequestStruct, and WalRcv.

◆ WalRcvStreaming()

bool WalRcvStreaming ( void  )

Definition at line 141 of file walreceiverfuncs.c.

142{
145 pg_time_t startTime;
146
147 SpinLockAcquire(&walrcv->mutex);
148
149 state = walrcv->walRcvState;
150 startTime = walrcv->startTime;
151
152 SpinLockRelease(&walrcv->mutex);
153
154 /*
155 * If it has taken too long for walreceiver to start up, give up. Setting
156 * the state to STOPPED ensures that if walreceiver later does start up
157 * after all, it will see that it's not supposed to be running and die
158 * without doing anything.
159 */
160 if (state == WALRCV_STARTING)
161 {
162 pg_time_t now = (pg_time_t) time(NULL);
163
164 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 {
166 bool stopped = false;
167
168 SpinLockAcquire(&walrcv->mutex);
169 if (walrcv->walRcvState == WALRCV_STARTING)
170 {
171 state = walrcv->walRcvState = WALRCV_STOPPED;
172 stopped = true;
173 }
174 SpinLockRelease(&walrcv->mutex);
175
176 if (stopped)
177 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
178 }
179 }
180
183 return true;
184 else
185 return false;
186}

References ConditionVariableBroadcast(), fb(), now(), SpinLockAcquire(), SpinLockRelease(), WalRcv, WALRCV_CONNECTING, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STARTUP_TIMEOUT, WALRCV_STOPPED, and WALRCV_STREAMING.

Referenced by FinishWalRecovery(), and WaitForWALToBecomeAvailable().

Variable Documentation

◆ WalRcv

◆ WalRcvShmemCallbacks

const ShmemCallbacks WalRcvShmemCallbacks
Initial value:
= {
.request_fn = WalRcvShmemRequest,
.init_fn = WalRcvShmemInit,
}
static void WalRcvShmemInit(void *arg)
static void WalRcvShmemRequest(void *arg)

Definition at line 41 of file walreceiverfuncs.c.

41 {
42 .request_fn = WalRcvShmemRequest,
43 .init_fn = WalRcvShmemInit,
44};