PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walreceiver.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * walreceiver.h
4 * Exports from replication/walreceiverfuncs.c.
5 *
6 * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
7 *
8 * src/include/replication/walreceiver.h
9 *
10 *-------------------------------------------------------------------------
11 */
12#ifndef _WALRECEIVER_H
13#define _WALRECEIVER_H
14
15#include <netdb.h>
16
17#include "access/xlog.h"
18#include "access/xlogdefs.h"
19#include "pgtime.h"
20#include "port/atomics.h"
24#include "storage/spin.h"
25#include "utils/tuplestore.h"
26
27/* user-settable parameters */
31
32/*
33 * MAXCONNINFO: maximum size of a connection string.
34 *
35 * XXX: Should this move to pg_config_manual.h?
36 */
37#define MAXCONNINFO 1024
38
39/* Can we allow the standby to accept replication connection from another standby? */
40#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
41
42/*
43 * Values for WalRcv->walRcvState.
44 */
45typedef enum
46{
47 WALRCV_STOPPED, /* stopped and mustn't start up again */
48 WALRCV_STARTING, /* launched, but the process hasn't
49 * initialized yet */
50 WALRCV_CONNECTING, /* connecting to upstream server */
51 WALRCV_STREAMING, /* walreceiver is streaming */
52 WALRCV_WAITING, /* stopped streaming, waiting for orders */
53 WALRCV_RESTARTING, /* asked to restart streaming */
54 WALRCV_STOPPING, /* requested to stop, but still running */
56
57/* Shared memory area for management of walreceiver process */
58typedef struct
59{
60 /*
61 * Currently active walreceiver process's proc number and PID.
62 *
63 * The startup process uses the proc number to wake it up after telling it
64 * where to start streaming (after setting receiveStart and
65 * receiveStartTLI), and also to tell it to send apply feedback to the
66 * primary whenever specially marked commit records are applied.
67 */
70
71 /* Its current state */
74
75 /*
76 * Its start time (actually, the time at which it was requested to be
77 * started).
78 */
80
81 /*
82 * receiveStart and receiveStartTLI indicate the first byte position and
83 * timeline that will be received. When startup process starts the
84 * walreceiver, it sets these to the point where it wants the streaming to
85 * begin.
86 */
89
90 /*
91 * flushedUpto-1 is the last byte position that has already been received,
92 * and receivedTLI is the timeline it came from. At the first startup of
93 * walreceiver, these are set to receiveStart and receiveStartTLI. After
94 * that, walreceiver updates these whenever it flushes the received WAL to
95 * disk.
96 */
99
100 /*
101 * latestChunkStart is the starting byte position of the current "batch"
102 * of received WAL. It's actually the same as the previous value of
103 * flushedUpto before the last flush to disk. Startup process can use
104 * this to detect whether it's keeping up or not.
105 */
107
108 /*
109 * Time of send and receive of any message received.
110 */
113
114 /*
115 * Latest reported end of WAL on the sender
116 */
119
120 /*
121 * connection string; initially set to connect to the primary, and later
122 * clobbered to hide security-sensitive fields.
123 */
124 char conninfo[MAXCONNINFO];
125
126 /*
127 * Host name (this can be a host name, an IP address, or a directory path)
128 * and port number of the active replication connection.
129 */
130 char sender_host[NI_MAXHOST];
132
133 /*
134 * replication slot name; is also used for walreceiver to connect with the
135 * primary
136 */
137 char slotname[NAMEDATALEN];
138
139 /*
140 * If it's a temporary replication slot, it needs to be recreated when
141 * connecting.
142 */
144
145 /* set true once conninfo is ready to display (obfuscated pwds etc) */
147
148 slock_t mutex; /* locks shared variables shown above */
149
150 /*
151 * Like flushedUpto, but advanced after writing and before flushing,
152 * without the need to acquire the spin lock. Data can be read by another
153 * process up to this point, but shouldn't be used for data integrity
154 * purposes.
155 */
157
158 /*
159 * force walreceiver reply? This doesn't need to be locked; memory
160 * barriers for ordering are sufficient. But we do need atomic fetch and
161 * store semantics, so use sig_atomic_t.
162 */
163 sig_atomic_t force_reply; /* used as a bool */
164} WalRcvData;
165
167
168typedef struct
169{
170 bool logical; /* True if this is logical replication stream,
171 * false if physical stream. */
172 char *slotname; /* Name of the replication slot or NULL. */
173 XLogRecPtr startpoint; /* LSN of starting point. */
174
175 union
176 {
177 struct
178 {
179 TimeLineID startpointTLI; /* Starting timeline */
180 } physical;
181 struct
182 {
183 uint32 proto_version; /* Logical protocol version */
184 List *publication_names; /* String list of publications */
185 bool binary; /* Ask publisher to use binary */
186 char *streaming_str; /* Streaming of large transactions */
187 bool twophase; /* Streaming of two-phase transactions at
188 * prepare time */
189 char *origin; /* Only publish data originating from the
190 * specified origin */
191 } logical;
192 } proto;
194
195struct WalReceiverConn;
197
198/*
199 * Status of walreceiver query execution.
200 *
201 * We only define statuses that are currently used.
202 */
203typedef enum
204{
205 WALRCV_ERROR, /* There was error when executing the query. */
206 WALRCV_OK_COMMAND, /* Query executed utility or replication
207 * command. */
208 WALRCV_OK_TUPLES, /* Query returned tuples. */
209 WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
210 WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
211 WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication
212 * protocol. */
214
215/*
216 * Return value for walrcv_exec, returns the status of the execution and
217 * tuples if any.
218 */
227
228/* WAL receiver - libpqwalreceiver hooks */
229
230/*
231 * walrcv_connect_fn
232 *
233 * Establish connection to a cluster. 'replication' is true if the
234 * connection is a replication connection, and false if it is a
235 * regular connection. If it is a replication connection, it could
236 * be either logical or physical based on input argument 'logical'.
237 * 'appname' is a name associated to the connection, to use for example
238 * with fallback_application_name or application_name. Returns the
239 * details about the connection established, as defined by
240 * WalReceiverConn for each WAL receiver module. On error, NULL is
241 * returned with 'err' including the error generated.
242 */
243typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
244 bool replication,
245 bool logical,
247 const char *appname,
248 char **err);
249
250/*
251 * walrcv_check_conninfo_fn
252 *
253 * Parse and validate the connection string given as of 'conninfo'.
254 */
255typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
256 bool must_use_password);
257
258/*
259 * walrcv_get_conninfo_fn
260 *
261 * Returns a user-displayable conninfo string. Note that any
262 * security-sensitive fields should be obfuscated.
263 */
264typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
265
266/*
267 * walrcv_get_senderinfo_fn
268 *
269 * Provide information of the WAL sender this WAL receiver is connected
270 * to, as of 'sender_host' for the host of the sender and 'sender_port'
271 * for its port.
272 */
274 char **sender_host,
275 int *sender_port);
276
277/*
278 * walrcv_identify_system_fn
279 *
280 * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
281 * identity of the cluster. Returns the system ID of the cluster
282 * connected to. 'primary_tli' is the timeline ID of the sender.
283 */
284typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
286
287/*
288 * walrcv_get_dbname_from_conninfo_fn
289 *
290 * Returns the database name from the primary_conninfo
291 */
292typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
293
294/*
295 * walrcv_server_version_fn
296 *
297 * Returns the version number of the cluster connected to.
298 */
300
301/*
302 * walrcv_readtimelinehistoryfile_fn
303 *
304 * Fetch from cluster the timeline history file for timeline 'tli'.
305 * Returns the name of the timeline history file as of 'filename', its
306 * contents as of 'content' and its 'size'.
307 */
309 TimeLineID tli,
310 char **filename,
311 char **content,
312 int *size);
313
314/*
315 * walrcv_startstreaming_fn
316 *
317 * Start streaming WAL data from given streaming options. Returns true
318 * if the connection has switched successfully to copy-both mode and false
319 * if the server received the command and executed it successfully, but
320 * didn't switch to copy-mode.
321 */
324
325/*
326 * walrcv_endstreaming_fn
327 *
328 * Stop streaming of WAL data. Returns the next timeline ID of the cluster
329 * connected to in 'next_tli', or 0 if there was no report.
330 */
333
334/*
335 * walrcv_receive_fn
336 *
337 * Receive a message available from the WAL stream. 'buffer' is a pointer
338 * to a buffer holding the message received. Returns the length of the data,
339 * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
340 * be waited on before a retry), and -1 if the cluster ended the COPY.
341 */
343 char **buffer,
345
346/*
347 * walrcv_send_fn
348 *
349 * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
350 * contents.
351 */
353 const char *buffer,
354 int nbytes);
355
356/*
357 * walrcv_create_slot_fn
358 *
359 * Create a new replication slot named 'slotname'. 'temporary' defines
360 * if the slot is temporary. 'snapshot_action' defines the behavior wanted
361 * for an exported snapshot (see replication protocol for more details).
362 * 'lsn' includes the LSN position at which the created slot became
363 * consistent. Returns the name of the exported snapshot for a logical
364 * slot, or NULL for a physical slot.
365 */
366typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
367 const char *slotname,
368 bool temporary,
369 bool two_phase,
370 bool failover,
372 XLogRecPtr *lsn);
373
374/*
375 * walrcv_alter_slot_fn
376 *
377 * Change the definition of a replication slot. Currently, it supports
378 * changing the failover and two_phase properties of the slot.
379 */
381 const char *slotname,
382 const bool *failover,
383 const bool *two_phase);
384
385
386/*
387 * walrcv_get_backend_pid_fn
388 *
389 * Returns the PID of the remote backend process.
390 */
392
393/*
394 * walrcv_exec_fn
395 *
396 * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
397 * is the expected number of returned attributes, and 'retTypes' an array
398 * including their type OIDs. Returns the status of the execution and
399 * tuples if any.
400 */
401typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
402 const char *query,
403 const int nRetTypes,
404 const Oid *retTypes);
405
406/*
407 * walrcv_disconnect_fn
408 *
409 * Disconnect with the cluster.
410 */
412
433
435
436#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
437 WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
438#define walrcv_check_conninfo(conninfo, must_use_password) \
439 WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
440#define walrcv_get_conninfo(conn) \
441 WalReceiverFunctions->walrcv_get_conninfo(conn)
442#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
443 WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
444#define walrcv_identify_system(conn, primary_tli) \
445 WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
446#define walrcv_get_dbname_from_conninfo(conninfo) \
447 WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
448#define walrcv_server_version(conn) \
449 WalReceiverFunctions->walrcv_server_version(conn)
450#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
451 WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
452#define walrcv_startstreaming(conn, options) \
453 WalReceiverFunctions->walrcv_startstreaming(conn, options)
454#define walrcv_endstreaming(conn, next_tli) \
455 WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
456#define walrcv_receive(conn, buffer, wait_fd) \
457 WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
458#define walrcv_send(conn, buffer, nbytes) \
459 WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
460#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
461 WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
462#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
463 WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
464#define walrcv_get_backend_pid(conn) \
465 WalReceiverFunctions->walrcv_get_backend_pid(conn)
466#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
467 WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
468#define walrcv_disconnect(conn) \
469 WalReceiverFunctions->walrcv_disconnect(conn)
470
471static inline void
473{
474 if (!walres)
475 return;
476
477 if (walres->err)
478 pfree(walres->err);
479
480 if (walres->tuplestore)
481 tuplestore_end(walres->tuplestore);
482
483 if (walres->tupledesc)
484 FreeTupleDesc(walres->tupledesc);
485
486 pfree(walres);
487}
488
489/* prototypes for functions in walreceiver.c */
490pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len);
491extern void WalRcvForceReply(void);
492
493/* prototypes for functions in walreceiverfuncs.c */
494extern Size WalRcvShmemSize(void);
495extern void WalRcvShmemInit(void);
496extern void ShutdownWalRcv(void);
497extern bool WalRcvStreaming(void);
498extern bool WalRcvRunning(void);
499extern WalRcvState WalRcvGetState(void);
501 const char *conninfo, const char *slotname,
502 bool create_temp_slot);
505extern int GetReplicationApplyDelay(void);
506extern int GetReplicationTransferLatency(void);
507
508#endif /* _WALRECEIVER_H */
#define PGDLLIMPORT
Definition c.h:1328
#define pg_noreturn
Definition c.h:164
uint32_t uint32
Definition c.h:546
size_t Size
Definition c.h:619
int64 TimestampTz
Definition timestamp.h:39
void err(int eval, const char *fmt,...)
Definition err.c:43
void pfree(void *pointer)
Definition mcxt.c:1616
#define NAMEDATALEN
static char * filename
Definition pg_dumpall.c:120
static bool two_phase
static bool failover
int64 pg_time_t
Definition pgtime.h:23
int pgsocket
Definition port.h:29
unsigned int Oid
static int fb(int x)
int ProcNumber
Definition procnumber.h:24
PGconn * conn
Definition streamutil.c:52
Definition pg_list.h:54
TimestampTz lastMsgReceiptTime
XLogRecPtr latestWalEnd
TimeLineID receiveStartTLI
Definition walreceiver.h:88
TimeLineID receivedTLI
Definition walreceiver.h:98
XLogRecPtr latestChunkStart
XLogRecPtr receiveStart
Definition walreceiver.h:87
XLogRecPtr flushedUpto
Definition walreceiver.h:97
sig_atomic_t force_reply
ProcNumber procno
Definition walreceiver.h:68
ConditionVariable walRcvStoppedCV
Definition walreceiver.h:73
bool is_temp_slot
pg_atomic_uint64 writtenUpto
pg_time_t startTime
Definition walreceiver.h:79
TimestampTz lastMsgSendTime
WalRcvState walRcvState
Definition walreceiver.h:72
TimestampTz latestWalEndTime
bool ready_to_display
slock_t mutex
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
XLogRecPtr startpoint
TimeLineID startpointTLI
walrcv_identify_system_fn walrcv_identify_system
walrcv_receive_fn walrcv_receive
walrcv_server_version_fn walrcv_server_version
walrcv_disconnect_fn walrcv_disconnect
walrcv_get_backend_pid_fn walrcv_get_backend_pid
walrcv_connect_fn walrcv_connect
walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo
walrcv_get_senderinfo_fn walrcv_get_senderinfo
walrcv_check_conninfo_fn walrcv_check_conninfo
walrcv_send_fn walrcv_send
walrcv_get_conninfo_fn walrcv_get_conninfo
walrcv_exec_fn walrcv_exec
walrcv_startstreaming_fn walrcv_startstreaming
walrcv_create_slot_fn walrcv_create_slot
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile
walrcv_alter_slot_fn walrcv_alter_slot
walrcv_endstreaming_fn walrcv_endstreaming
void FreeTupleDesc(TupleDesc tupdesc)
Definition tupdesc.c:502
void tuplestore_end(Tuplestorestate *state)
Definition tuplestore.c:492
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
void(* walrcv_endstreaming_fn)(WalReceiverConn *conn, TimeLineID *next_tli)
bool WalRcvStreaming(void)
char *(* walrcv_get_dbname_from_conninfo_fn)(const char *conninfo)
void(* walrcv_send_fn)(WalReceiverConn *conn, const char *buffer, int nbytes)
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
PGDLLIMPORT int wal_receiver_status_interval
Definition walreceiver.c:89
char *(* walrcv_create_slot_fn)(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
pg_noreturn void WalReceiverMain(const void *startup_data, size_t startup_data_len)
WalRcvExecStatus
@ WALRCV_OK_COPY_IN
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
@ WALRCV_OK_TUPLES
@ WALRCV_OK_COPY_OUT
@ WALRCV_OK_COPY_BOTH
WalReceiverConn *(* walrcv_connect_fn)(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err)
pid_t(* walrcv_get_backend_pid_fn)(WalReceiverConn *conn)
XLogRecPtr GetWalRcvWriteRecPtr(void)
#define MAXCONNINFO
Definition walreceiver.h:37
void ShutdownWalRcv(void)
bool(* walrcv_startstreaming_fn)(WalReceiverConn *conn, const WalRcvStreamOptions *options)
int(* walrcv_receive_fn)(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
void(* walrcv_readtimelinehistoryfile_fn)(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *size)
WalRcvExecResult *(* walrcv_exec_fn)(WalReceiverConn *conn, const char *query, const int nRetTypes, const Oid *retTypes)
PGDLLIMPORT int wal_receiver_timeout
Definition walreceiver.c:90
static void walrcv_clear_result(WalRcvExecResult *walres)
WalRcvState WalRcvGetState(void)
void(* walrcv_check_conninfo_fn)(const char *conninfo, bool must_use_password)
PGDLLIMPORT WalReceiverFunctionsType * WalReceiverFunctions
Definition walreceiver.c:95
void(* walrcv_disconnect_fn)(WalReceiverConn *conn)
bool WalRcvRunning(void)
WalRcvState
Definition walreceiver.h:46
@ WALRCV_STARTING
Definition walreceiver.h:48
@ WALRCV_STOPPED
Definition walreceiver.h:47
@ WALRCV_CONNECTING
Definition walreceiver.h:50
@ WALRCV_RESTARTING
Definition walreceiver.h:53
@ WALRCV_STREAMING
Definition walreceiver.h:51
@ WALRCV_WAITING
Definition walreceiver.h:52
@ WALRCV_STOPPING
Definition walreceiver.h:54
char *(* walrcv_identify_system_fn)(WalReceiverConn *conn, TimeLineID *primary_tli)
void(* walrcv_get_senderinfo_fn)(WalReceiverConn *conn, char **sender_host, int *sender_port)
int GetReplicationApplyDelay(void)
PGDLLIMPORT WalRcvData * WalRcv
void WalRcvShmemInit(void)
char *(* walrcv_get_conninfo_fn)(WalReceiverConn *conn)
Size WalRcvShmemSize(void)
void(* walrcv_alter_slot_fn)(WalReceiverConn *conn, const char *slotname, const bool *failover, const bool *two_phase)
PGDLLIMPORT bool hot_standby_feedback
Definition walreceiver.c:91
int(* walrcv_server_version_fn)(WalReceiverConn *conn)
void WalRcvForceReply(void)
int GetReplicationTransferLatency(void)
CRSSnapshotAction
Definition walsender.h:21
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI