PostgreSQL Source Code git master
basebackup_copy.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * basebackup_copy.c
4 * send basebackup archives using COPY OUT
5 *
6 * We send a result set with information about the tablespaces to be included
7 * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8 * operation and transmits all the archives and the manifest if present during
9 * the course of that single COPY OUT. Each CopyData message begins with a
10 * type byte, allowing us to signal the start of a new archive, or the
11 * manifest, by some means other than ending the COPY stream. This also allows
12 * for future protocol extensions, since we can include arbitrary information
13 * in the message stream as long as we're certain that the client will know
14 * what to do with it.
15 *
16 * An older method that sent each archive using a separate COPY OUT
17 * operation is no longer supported.
18 *
19 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
20 *
21 * IDENTIFICATION
22 * src/backend/backup/basebackup_copy.c
23 *
24 *-------------------------------------------------------------------------
25 */
26#include "postgres.h"
27
28#include "access/tupdesc.h"
29#include "backup/basebackup.h"
31#include "catalog/pg_type_d.h"
32#include "executor/executor.h"
33#include "libpq/libpq.h"
34#include "libpq/pqformat.h"
35#include "tcop/dest.h"
36#include "utils/builtins.h"
37#include "utils/timestamp.h"
38
39typedef struct bbsink_copystream
40{
41 /* Common information for all types of sink. */
43
44 /* Are we sending the archives to the client, or somewhere else? */
46
47 /*
48 * Protocol message buffer. We assemble CopyData protocol messages by
49 * setting the first character of this buffer to 'd' (archive or manifest
50 * data) and then making base.bbs_buffer point to the second character so
51 * that the rest of the data gets copied into the message just where we
52 * want it.
53 */
54 char *msgbuffer;
55
56 /*
57 * When did we last report progress to the client, and how much progress
58 * did we report?
59 */
63
64/*
65 * We don't want to send progress messages to the client excessively
66 * frequently. Ideally, we'd like to send a message when the time since the
67 * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
68 * the system time every time we send a tiny bit of data seems too expensive.
69 * So we only check it after the number of bytes sine the last check reaches
70 * PROGRESS_REPORT_BYTE_INTERVAL.
71 */
72#define PROGRESS_REPORT_BYTE_INTERVAL 65536
73#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
74
75static void bbsink_copystream_begin_backup(bbsink *sink);
77 const char *archive_name);
78static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
79static void bbsink_copystream_end_archive(bbsink *sink);
81static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
82static void bbsink_copystream_end_manifest(bbsink *sink);
83static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
84 TimeLineID endtli);
85static void bbsink_copystream_cleanup(bbsink *sink);
86
87static void SendCopyOutResponse(void);
88static void SendCopyDone(void);
89static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
90static void SendTablespaceList(List *tablespaces);
91
94 .begin_archive = bbsink_copystream_begin_archive,
95 .archive_contents = bbsink_copystream_archive_contents,
96 .end_archive = bbsink_copystream_end_archive,
97 .begin_manifest = bbsink_copystream_begin_manifest,
98 .manifest_contents = bbsink_copystream_manifest_contents,
99 .end_manifest = bbsink_copystream_end_manifest,
100 .end_backup = bbsink_copystream_end_backup,
102};
103
104/*
105 * Create a new 'copystream' bbsink.
106 */
107bbsink *
108bbsink_copystream_new(bool send_to_client)
109{
111
112 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
113 sink->send_to_client = send_to_client;
114
115 /* Set up for periodic progress reporting. */
118
119 return &sink->base;
120}
121
122/*
123 * Send start-of-backup wire protocol messages.
124 */
125static void
127{
128 bbsink_copystream *mysink = (bbsink_copystream *) sink;
130 char *buf;
131
132 /*
133 * Initialize buffer. We ultimately want to send the archive and manifest
134 * data by means of CopyData messages where the payload portion of each
135 * message begins with a type byte. However, basebackup.c expects the
136 * buffer to be aligned, so we can't just allocate one extra byte for the
137 * type byte. Instead, allocate enough extra bytes that the portion of the
138 * buffer we reveal to our callers can be aligned, while leaving room to
139 * slip the type byte in just beforehand. That will allow us to ship the
140 * data with a single call to pq_putmessage and without needing any extra
141 * copying.
142 */
143 buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144 mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145 mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
146 mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
147
148 /* Tell client the backup start location. */
149 SendXlogRecPtrResult(state->startptr, state->starttli);
150
151 /* Send client a list of tablespaces. */
152 SendTablespaceList(state->tablespaces);
153
154 /* Send a CommandComplete message */
156
157 /* Begin COPY stream. This will be used for all archives + manifest. */
159}
160
161/*
162 * Send a CopyData message announcing the beginning of a new archive.
163 */
164static void
165bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
166{
168 tablespaceinfo *ti;
170
171 ti = list_nth(state->tablespaces, state->tablespace_num);
173 pq_sendbyte(&buf, 'n'); /* New archive */
174 pq_sendstring(&buf, archive_name);
175 pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
177}
178
179/*
180 * Send a CopyData message containing a chunk of archive content.
181 */
182static void
184{
185 bbsink_copystream *mysink = (bbsink_copystream *) sink;
186 bbsink_state *state = mysink->base.bbs_state;
188 uint64 targetbytes;
189
190 /* Send the archive content to the client, if appropriate. */
191 if (mysink->send_to_client)
192 {
193 /* Add one because we're also sending a leading type byte. */
194 pq_putmessage('d', mysink->msgbuffer, len + 1);
195 }
196
197 /* Consider whether to send a progress report to the client. */
198 targetbytes = mysink->bytes_done_at_last_time_check
200 if (targetbytes <= state->bytes_done)
201 {
203 long ms;
204
205 /*
206 * OK, we've sent a decent number of bytes, so check the system time
207 * to see whether we're due to send a progress report.
208 */
209 mysink->bytes_done_at_last_time_check = state->bytes_done;
211 now);
212
213 /*
214 * Send a progress report if enough time has passed. Also send one if
215 * the system clock was set backward, so that such occurrences don't
216 * have the effect of suppressing further progress messages.
217 */
219 now < mysink->last_progress_report_time)
220 {
222
224 pq_sendbyte(&buf, 'p'); /* Progress report */
225 pq_sendint64(&buf, state->bytes_done);
228 }
229 }
230}
231
232/*
233 * We don't need to explicitly signal the end of the archive; the client
234 * will figure out that we've reached the end when we begin the next one,
235 * or begin the manifest, or end the COPY stream. However, this seems like
236 * a good time to force out a progress report. One reason for that is that
237 * if this is the last archive, and we don't force a progress report now,
238 * the client will never be told that we sent all the bytes.
239 */
240static void
242{
243 bbsink_copystream *mysink = (bbsink_copystream *) sink;
244 bbsink_state *state = mysink->base.bbs_state;
246
247 mysink->bytes_done_at_last_time_check = state->bytes_done;
250 pq_sendbyte(&buf, 'p'); /* Progress report */
251 pq_sendint64(&buf, state->bytes_done);
254}
255
256/*
257 * Send a CopyData message announcing the beginning of the backup manifest.
258 */
259static void
261{
263
265 pq_sendbyte(&buf, 'm'); /* Manifest */
267}
268
269/*
270 * Each chunk of manifest data is sent using a CopyData message.
271 */
272static void
274{
275 bbsink_copystream *mysink = (bbsink_copystream *) sink;
276
277 if (mysink->send_to_client)
278 {
279 /* Add one because we're also sending a leading type byte. */
280 pq_putmessage('d', mysink->msgbuffer, len + 1);
281 }
282}
283
284/*
285 * We don't need an explicit terminator for the backup manifest.
286 */
287static void
289{
290 /* Do nothing. */
291}
292
293/*
294 * Send end-of-backup wire protocol messages.
295 */
296static void
298 TimeLineID endtli)
299{
300 SendCopyDone();
301 SendXlogRecPtrResult(endptr, endtli);
302}
303
304/*
305 * Cleanup.
306 */
307static void
309{
310 /* Nothing to do. */
311}
312
313/*
314 * Send a CopyOutResponse message.
315 */
316static void
318{
320
322 pq_sendbyte(&buf, 0); /* overall format */
323 pq_sendint16(&buf, 0); /* natts */
325}
326
327/*
328 * Send a CopyDone message.
329 */
330static void
332{
334}
335
336/*
337 * Send a single resultset containing just a single
338 * XLogRecPtr record (in text format)
339 */
340static void
342{
344 TupOutputState *tstate;
345 TupleDesc tupdesc;
346 Datum values[2];
347 bool nulls[2] = {0};
348
350
351 tupdesc = CreateTemplateTupleDesc(2);
352 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
353
354 /*
355 * int8 may seem like a surprising data type for this, but in theory int4
356 * would not be wide enough for this, as TimeLineID is unsigned.
357 */
358 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
359
360 /* send RowDescription */
361 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
362
363 /* Data row */
365 values[1] = Int64GetDatum(tli);
366 do_tup_output(tstate, values, nulls);
367
368 end_tup_output(tstate);
369
370 /* Send a CommandComplete message */
372}
373
374/*
375 * Send a result set via libpq describing the tablespace list.
376 */
377static void
379{
381 TupOutputState *tstate;
382 TupleDesc tupdesc;
383 ListCell *lc;
384
386
387 tupdesc = CreateTemplateTupleDesc(3);
388 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
389 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
390 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
391
392 /* send RowDescription */
393 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
394
395 /* Construct and send the directory information */
396 foreach(lc, tablespaces)
397 {
398 tablespaceinfo *ti = lfirst(lc);
399 Datum values[3];
400 bool nulls[3] = {0};
401
402 /* Send one datarow message */
403 if (ti->path == NULL)
404 {
405 nulls[0] = true;
406 nulls[1] = true;
407 }
408 else
409 {
410 values[0] = ObjectIdGetDatum(ti->oid);
412 }
413 if (ti->size >= 0)
414 values[2] = Int64GetDatum(ti->size / 1024);
415 else
416 nulls[2] = true;
417
418 do_tup_output(tstate, values, nulls);
419 }
420
421 end_tup_output(tstate);
422}
int16 AttrNumber
Definition: attnum.h:21
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static void SendCopyDone(void)
bbsink * bbsink_copystream_new(bool send_to_client)
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static const bbsink_ops bbsink_copystream_ops
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
struct bbsink_copystream bbsink_copystream
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
uint64_t uint64
Definition: c.h:489
#define UINT64CONST(x)
Definition: c.h:503
int64 TimestampTz
Definition: timestamp.h:39
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2462
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2520
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2442
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
static char * buf
Definition: pg_test_fsync.c:72
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:388
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:367
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_CommandComplete
Definition: protocol.h:42
#define PqMsg_CopyOutResponse
Definition: protocol.h:46
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
Definition: pg_list.h:54
TimestampTz last_progress_report_time
uint64 bytes_done_at_last_time_check
void(* begin_backup)(bbsink *sink)
bbsink_state * bbs_state
char * bbs_buffer
const bbsink_ops * bbs_ops
size_t bbs_buffer_length
Definition: regguts.h:323
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:164
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:874
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59