PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiverfuncs.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17#include "postgres.h"
18
19#include <sys/stat.h>
20#include <sys/time.h>
21#include <time.h>
22#include <unistd.h>
23#include <signal.h>
24
26#include "access/xlogrecovery.h"
27#include "pgstat.h"
29#include "storage/pmsignal.h"
30#include "storage/proc.h"
31#include "storage/shmem.h"
32#include "utils/timestamp.h"
33#include "utils/wait_event.h"
34
36
37/*
38 * How long to wait for walreceiver to start up after requesting
39 * postmaster to launch it. In seconds.
40 */
41#define WALRCV_STARTUP_TIMEOUT 10
42
43/* Report shared memory space needed by WalRcvShmemInit */
44Size
46{
47 Size size = 0;
48
49 size = add_size(size, sizeof(WalRcvData));
50
51 return size;
52}
53
54/* Allocate and initialize walreceiver-related shared memory */
55void
57{
58 bool found;
59
60 WalRcv = (WalRcvData *)
61 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
62
63 if (!found)
64 {
65 /* First time through, so initialize */
72 }
73}
74
75/* Is walreceiver running (or starting up)? */
76bool
78{
81 pg_time_t startTime;
82
83 SpinLockAcquire(&walrcv->mutex);
84
85 state = walrcv->walRcvState;
86 startTime = walrcv->startTime;
87
88 SpinLockRelease(&walrcv->mutex);
89
90 /*
91 * If it has taken too long for walreceiver to start up, give up. Setting
92 * the state to STOPPED ensures that if walreceiver later does start up
93 * after all, it will see that it's not supposed to be running and die
94 * without doing anything.
95 */
97 {
98 pg_time_t now = (pg_time_t) time(NULL);
99
100 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
101 {
102 bool stopped = false;
103
104 SpinLockAcquire(&walrcv->mutex);
105 if (walrcv->walRcvState == WALRCV_STARTING)
106 {
107 state = walrcv->walRcvState = WALRCV_STOPPED;
108 stopped = true;
109 }
110 SpinLockRelease(&walrcv->mutex);
111
112 if (stopped)
113 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
114 }
115 }
116
117 if (state != WALRCV_STOPPED)
118 return true;
119 else
120 return false;
121}
122
123/* Return the state of the walreceiver. */
126{
129
130 SpinLockAcquire(&walrcv->mutex);
131 state = walrcv->walRcvState;
132 SpinLockRelease(&walrcv->mutex);
133
134 return state;
135}
136
137/*
138 * Is walreceiver running and streaming (or at least attempting to connect,
139 * or starting up)?
140 */
141bool
143{
146 pg_time_t startTime;
147
148 SpinLockAcquire(&walrcv->mutex);
149
150 state = walrcv->walRcvState;
151 startTime = walrcv->startTime;
152
153 SpinLockRelease(&walrcv->mutex);
154
155 /*
156 * If it has taken too long for walreceiver to start up, give up. Setting
157 * the state to STOPPED ensures that if walreceiver later does start up
158 * after all, it will see that it's not supposed to be running and die
159 * without doing anything.
160 */
161 if (state == WALRCV_STARTING)
162 {
163 pg_time_t now = (pg_time_t) time(NULL);
164
165 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
166 {
167 bool stopped = false;
168
169 SpinLockAcquire(&walrcv->mutex);
170 if (walrcv->walRcvState == WALRCV_STARTING)
171 {
172 state = walrcv->walRcvState = WALRCV_STOPPED;
173 stopped = true;
174 }
175 SpinLockRelease(&walrcv->mutex);
176
177 if (stopped)
178 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
179 }
180 }
181
184 return true;
185 else
186 return false;
187}
188
189/*
190 * Stop walreceiver (if running) and wait for it to die.
191 * Executed by the Startup process.
192 */
193void
195{
197 pid_t walrcvpid = 0;
198 bool stopped = false;
199
200 /*
201 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
202 * mode once it's finished, and will also request postmaster to not
203 * restart itself.
204 */
205 SpinLockAcquire(&walrcv->mutex);
206 switch (walrcv->walRcvState)
207 {
208 case WALRCV_STOPPED:
209 break;
210 case WALRCV_STARTING:
211 walrcv->walRcvState = WALRCV_STOPPED;
212 stopped = true;
213 break;
214
216 case WALRCV_STREAMING:
217 case WALRCV_WAITING:
219 walrcv->walRcvState = WALRCV_STOPPING;
221 case WALRCV_STOPPING:
222 walrcvpid = walrcv->pid;
223 break;
224 }
225 SpinLockRelease(&walrcv->mutex);
226
227 /* Unnecessary but consistent. */
228 if (stopped)
229 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230
231 /*
232 * Signal walreceiver process if it was still running.
233 */
234 if (walrcvpid != 0)
236
237 /*
238 * Wait for walreceiver to acknowledge its death by setting state to
239 * WALRCV_STOPPED.
240 */
241 ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
242 while (WalRcvRunning())
243 ConditionVariableSleep(&walrcv->walRcvStoppedCV,
246}
247
248/*
249 * Request postmaster to start walreceiver.
250 *
251 * "recptr" indicates the position where streaming should begin. "conninfo"
252 * is a libpq connection string to use. "slotname" is, optionally, the name
253 * of a replication slot to acquire. "create_temp_slot" indicates to create
254 * a temporary slot when no "slotname" is given.
255 *
256 * WAL receivers do not directly load GUC parameters used for the connection
257 * to the primary, and rely on the values passed down by the caller of this
258 * routine instead. Hence, the addition of any new parameters should happen
259 * through this code path.
260 */
261void
263 const char *slotname, bool create_temp_slot)
264{
266 bool launch = false;
267 pg_time_t now = (pg_time_t) time(NULL);
269
270 /*
271 * We always start at the beginning of the segment. That prevents a broken
272 * segment (i.e., with no records in the first half of a segment) from
273 * being created by XLOG streaming, which might cause trouble later on if
274 * the segment is e.g archived.
275 */
278
279 SpinLockAcquire(&walrcv->mutex);
280
281 /* It better be stopped if we try to restart it */
282 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
283 walrcv->walRcvState == WALRCV_WAITING);
284
285 if (conninfo != NULL)
286 strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
287 else
288 walrcv->conninfo[0] = '\0';
289
290 /*
291 * Use configured replication slot if present, and ignore the value of
292 * create_temp_slot as the slot name should be persistent. Otherwise, use
293 * create_temp_slot to determine whether this WAL receiver should create a
294 * temporary slot by itself and use it, or not.
295 */
296 if (slotname != NULL && slotname[0] != '\0')
297 {
298 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
299 walrcv->is_temp_slot = false;
300 }
301 else
302 {
303 walrcv->slotname[0] = '\0';
304 walrcv->is_temp_slot = create_temp_slot;
305 }
306
307 if (walrcv->walRcvState == WALRCV_STOPPED)
308 {
309 launch = true;
310 walrcv->walRcvState = WALRCV_STARTING;
311 }
312 else
313 walrcv->walRcvState = WALRCV_RESTARTING;
314 walrcv->startTime = now;
315
316 /*
317 * If this is the first startup of walreceiver (on this timeline),
318 * initialize flushedUpto and latestChunkStart to the starting point.
319 */
320 if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
321 {
322 walrcv->flushedUpto = recptr;
323 walrcv->receivedTLI = tli;
324 walrcv->latestChunkStart = recptr;
325 }
326 walrcv->receiveStart = recptr;
327 walrcv->receiveStartTLI = tli;
328
329 walrcv_proc = walrcv->procno;
330
331 SpinLockRelease(&walrcv->mutex);
332
333 if (launch)
337}
338
339/*
340 * Returns the last+1 byte position that walreceiver has flushed.
341 *
342 * Optionally, returns the previous chunk start, that is the first byte
343 * written in the most recent walreceiver flush cycle. Callers not
344 * interested in that value may pass NULL for latestChunkStart. Same for
345 * receiveTLI.
346 */
349{
352
353 SpinLockAcquire(&walrcv->mutex);
354 recptr = walrcv->flushedUpto;
355 if (latestChunkStart)
356 *latestChunkStart = walrcv->latestChunkStart;
357 if (receiveTLI)
358 *receiveTLI = walrcv->receivedTLI;
359 SpinLockRelease(&walrcv->mutex);
360
361 return recptr;
362}
363
364/*
365 * Returns the last+1 byte position that walreceiver has written.
366 * This returns a recently written value without taking a lock.
367 */
370{
372
373 return pg_atomic_read_u64(&walrcv->writtenUpto);
374}
375
376/*
377 * Returns the replication apply delay in ms or -1
378 * if the apply delay info is not available
379 */
380int
405
406/*
407 * Returns the network latency in ms, note that this includes any
408 * difference in clock settings between the servers, as well as timezone.
409 */
410int
412{
414 TimestampTz lastMsgSendTime;
415 TimestampTz lastMsgReceiptTime;
416
417 SpinLockAcquire(&walrcv->mutex);
418 lastMsgSendTime = walrcv->lastMsgSendTime;
419 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 SpinLockRelease(&walrcv->mutex);
421
422 return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 lastMsgReceiptTime);
424}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition atomics.h:467
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1748
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1600
#define Assert(condition)
Definition c.h:945
#define pg_fallthrough
Definition c.h:152
#define MemSet(start, val, len)
Definition c.h:1109
size_t Size
Definition c.h:691
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
void SetLatch(Latch *latch)
Definition latch.c:290
#define NAMEDATALEN
int64 pg_time_t
Definition pgtime.h:23
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:165
@ PMSIGNAL_START_WALRECEIVER
Definition pmsignal.h:42
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
static int fb(int x)
#define GetPGProcByNumber(n)
Definition proc.h:501
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
Size add_size(Size s1, Size s2)
Definition shmem.c:485
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ProcNumber procno
Definition walreceiver.h:68
ConditionVariable walRcvStoppedCV
Definition walreceiver.h:73
pg_atomic_uint64 writtenUpto
WalRcvState walRcvState
Definition walreceiver.h:72
slock_t mutex
#define MAXCONNINFO
Definition walreceiver.h:37
WalRcvState
Definition walreceiver.h:46
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool WalRcvStreaming(void)
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
WalRcvData * WalRcv
XLogRecPtr GetWalRcvWriteRecPtr(void)
void ShutdownWalRcv(void)
#define WALRCV_STARTUP_TIMEOUT
WalRcvState WalRcvGetState(void)
bool WalRcvRunning(void)
int GetReplicationApplyDelay(void)
void WalRcvShmemInit(void)
Size WalRcvShmemSize(void)
int GetReplicationTransferLatency(void)
#define kill(pid, sig)
Definition win32_port.h:490
int wal_segment_size
Definition xlog.c:147
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)