PostgreSQL Source Code  git master
basebackup_copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_copy.c
4  * send basebackup archives using COPY OUT
5  *
6  * We send a result set with information about the tabelspaces to be included
7  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8  * operation and transmits all the archives and the manifest if present during
9  * the course of that single COPY OUT. Each CopyData message begins with a
10  * type byte, allowing us to signal the start of a new archive, or the
11  * manifest, by some means other than ending the COPY stream. This also allows
12  * for future protocol extensions, since we can include arbitrary information
13  * in the message stream as long as we're certain that the client will know
14  * what to do with it.
15  *
16  * An older method that sent each archive using a separate COPY OUT
17  * operation is no longer supported.
18  *
19  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
20  *
21  * IDENTIFICATION
22  * src/backend/backup/basebackup_copy.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres.h"
27 
28 #include "access/tupdesc.h"
29 #include "backup/basebackup.h"
30 #include "backup/basebackup_sink.h"
31 #include "catalog/pg_type_d.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "tcop/dest.h"
36 #include "utils/builtins.h"
37 #include "utils/timestamp.h"
38 
39 typedef struct bbsink_copystream
40 {
41  /* Common information for all types of sink. */
43 
44  /* Are we sending the archives to the client, or somewhere else? */
46 
47  /*
48  * Protocol message buffer. We assemble CopyData protocol messages by
49  * setting the first character of this buffer to 'd' (archive or manifest
50  * data) and then making base.bbs_buffer point to the second character so
51  * that the rest of the data gets copied into the message just where we
52  * want it.
53  */
54  char *msgbuffer;
55 
56  /*
57  * When did we last report progress to the client, and how much progress
58  * did we report?
59  */
63 
64 /*
65  * We don't want to send progress messages to the client excessively
66  * frequently. Ideally, we'd like to send a message when the time since the
67  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
68  * the system time every time we send a tiny bit of data seems too expensive.
69  * So we only check it after the number of bytes sine the last check reaches
70  * PROGRESS_REPORT_BYTE_INTERVAL.
71  */
72 #define PROGRESS_REPORT_BYTE_INTERVAL 65536
73 #define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
74 
75 static void bbsink_copystream_begin_backup(bbsink *sink);
76 static void bbsink_copystream_begin_archive(bbsink *sink,
77  const char *archive_name);
78 static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
79 static void bbsink_copystream_end_archive(bbsink *sink);
80 static void bbsink_copystream_begin_manifest(bbsink *sink);
81 static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
82 static void bbsink_copystream_end_manifest(bbsink *sink);
83 static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
84  TimeLineID endtli);
85 static void bbsink_copystream_cleanup(bbsink *sink);
86 
87 static void SendCopyOutResponse(void);
88 static void SendCopyDone(void);
89 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
90 static void SendTablespaceList(List *tablespaces);
91 
94  .begin_archive = bbsink_copystream_begin_archive,
95  .archive_contents = bbsink_copystream_archive_contents,
96  .end_archive = bbsink_copystream_end_archive,
97  .begin_manifest = bbsink_copystream_begin_manifest,
98  .manifest_contents = bbsink_copystream_manifest_contents,
99  .end_manifest = bbsink_copystream_end_manifest,
100  .end_backup = bbsink_copystream_end_backup,
101  .cleanup = bbsink_copystream_cleanup
102 };
103 
104 /*
105  * Create a new 'copystream' bbsink.
106  */
107 bbsink *
108 bbsink_copystream_new(bool send_to_client)
109 {
111 
112  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
113  sink->send_to_client = send_to_client;
114 
115  /* Set up for periodic progress reporting. */
117  sink->bytes_done_at_last_time_check = UINT64CONST(0);
118 
119  return &sink->base;
120 }
121 
122 /*
123  * Send start-of-backup wire protocol messages.
124  */
125 static void
127 {
128  bbsink_copystream *mysink = (bbsink_copystream *) sink;
129  bbsink_state *state = sink->bbs_state;
130  char *buf;
131 
132  /*
133  * Initialize buffer. We ultimately want to send the archive and manifest
134  * data by means of CopyData messages where the payload portion of each
135  * message begins with a type byte. However, basebackup.c expects the
136  * buffer to be aligned, so we can't just allocate one extra byte for the
137  * type byte. Instead, allocate enough extra bytes that the portion of the
138  * buffer we reveal to our callers can be aligned, while leaving room to
139  * slip the type byte in just beforehand. That will allow us to ship the
140  * data with a single call to pq_putmessage and without needing any extra
141  * copying.
142  */
143  buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144  mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145  mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
146  mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
147 
148  /* Tell client the backup start location. */
149  SendXlogRecPtrResult(state->startptr, state->starttli);
150 
151  /* Send client a list of tablespaces. */
152  SendTablespaceList(state->tablespaces);
153 
154  /* Send a CommandComplete message */
155  pq_puttextmessage('C', "SELECT");
156 
157  /* Begin COPY stream. This will be used for all archives + manifest. */
159 }
160 
161 /*
162  * Send a CopyData message announcing the beginning of a new archive.
163  */
164 static void
165 bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
166 {
167  bbsink_state *state = sink->bbs_state;
168  tablespaceinfo *ti;
170 
171  ti = list_nth(state->tablespaces, state->tablespace_num);
172  pq_beginmessage(&buf, 'd'); /* CopyData */
173  pq_sendbyte(&buf, 'n'); /* New archive */
174  pq_sendstring(&buf, archive_name);
175  pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
176  pq_endmessage(&buf);
177 }
178 
179 /*
180  * Send a CopyData message containing a chunk of archive content.
181  */
182 static void
184 {
185  bbsink_copystream *mysink = (bbsink_copystream *) sink;
186  bbsink_state *state = mysink->base.bbs_state;
188  uint64 targetbytes;
189 
190  /* Send the archive content to the client, if appropriate. */
191  if (mysink->send_to_client)
192  {
193  /* Add one because we're also sending a leading type byte. */
194  pq_putmessage('d', mysink->msgbuffer, len + 1);
195  }
196 
197  /* Consider whether to send a progress report to the client. */
198  targetbytes = mysink->bytes_done_at_last_time_check
200  if (targetbytes <= state->bytes_done)
201  {
203  long ms;
204 
205  /*
206  * OK, we've sent a decent number of bytes, so check the system time
207  * to see whether we're due to send a progress report.
208  */
209  mysink->bytes_done_at_last_time_check = state->bytes_done;
211  now);
212 
213  /*
214  * Send a progress report if enough time has passed. Also send one if
215  * the system clock was set backward, so that such occurrences don't
216  * have the effect of suppressing further progress messages.
217  */
219  now < mysink->last_progress_report_time)
220  {
221  mysink->last_progress_report_time = now;
222 
223  pq_beginmessage(&buf, 'd'); /* CopyData */
224  pq_sendbyte(&buf, 'p'); /* Progress report */
225  pq_sendint64(&buf, state->bytes_done);
226  pq_endmessage(&buf);
228  }
229  }
230 }
231 
232 /*
233  * We don't need to explicitly signal the end of the archive; the client
234  * will figure out that we've reached the end when we begin the next one,
235  * or begin the manifest, or end the COPY stream. However, this seems like
236  * a good time to force out a progress report. One reason for that is that
237  * if this is the last archive, and we don't force a progress report now,
238  * the client will never be told that we sent all the bytes.
239  */
240 static void
242 {
243  bbsink_copystream *mysink = (bbsink_copystream *) sink;
244  bbsink_state *state = mysink->base.bbs_state;
246 
247  mysink->bytes_done_at_last_time_check = state->bytes_done;
249  pq_beginmessage(&buf, 'd'); /* CopyData */
250  pq_sendbyte(&buf, 'p'); /* Progress report */
251  pq_sendint64(&buf, state->bytes_done);
252  pq_endmessage(&buf);
254 }
255 
256 /*
257  * Send a CopyData message announcing the beginning of the backup manifest.
258  */
259 static void
261 {
263 
264  pq_beginmessage(&buf, 'd'); /* CopyData */
265  pq_sendbyte(&buf, 'm'); /* Manifest */
266  pq_endmessage(&buf);
267 }
268 
269 /*
270  * Each chunk of manifest data is sent using a CopyData message.
271  */
272 static void
274 {
275  bbsink_copystream *mysink = (bbsink_copystream *) sink;
276 
277  if (mysink->send_to_client)
278  {
279  /* Add one because we're also sending a leading type byte. */
280  pq_putmessage('d', mysink->msgbuffer, len + 1);
281  }
282 }
283 
284 /*
285  * We don't need an explicit terminator for the backup manifest.
286  */
287 static void
289 {
290  /* Do nothing. */
291 }
292 
293 /*
294  * Send end-of-backup wire protocol messages.
295  */
296 static void
298  TimeLineID endtli)
299 {
300  SendCopyDone();
301  SendXlogRecPtrResult(endptr, endtli);
302 }
303 
304 /*
305  * Cleanup.
306  */
307 static void
309 {
310  /* Nothing to do. */
311 }
312 
313 /*
314  * Send a CopyOutResponse message.
315  */
316 static void
318 {
320 
321  pq_beginmessage(&buf, 'H');
322  pq_sendbyte(&buf, 0); /* overall format */
323  pq_sendint16(&buf, 0); /* natts */
324  pq_endmessage(&buf);
325 }
326 
327 /*
328  * Send a CopyDone message.
329  */
330 static void
332 {
333  pq_putemptymessage('c');
334 }
335 
336 /*
337  * Send a single resultset containing just a single
338  * XLogRecPtr record (in text format)
339  */
340 static void
342 {
344  TupOutputState *tstate;
345  TupleDesc tupdesc;
346  Datum values[2];
347  bool nulls[2] = {0};
348 
350 
351  tupdesc = CreateTemplateTupleDesc(2);
352  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
353  /*
354  * int8 may seem like a surprising data type for this, but in theory int4
355  * would not be wide enough for this, as TimeLineID is unsigned.
356  */
357  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
358 
359  /* send RowDescription */
360  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
361 
362  /* Data row */
364  values[1] = Int64GetDatum(tli);
365  do_tup_output(tstate, values, nulls);
366 
367  end_tup_output(tstate);
368 
369  /* Send a CommandComplete message */
370  pq_puttextmessage('C', "SELECT");
371 }
372 
373 /*
374  * Send a result set via libpq describing the tablespace list.
375  */
376 static void
378 {
380  TupOutputState *tstate;
381  TupleDesc tupdesc;
382  ListCell *lc;
383 
385 
386  tupdesc = CreateTemplateTupleDesc(3);
387  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
388  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
389  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
390 
391  /* send RowDescription */
392  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
393 
394  /* Construct and send the directory information */
395  foreach(lc, tablespaces)
396  {
397  tablespaceinfo *ti = lfirst(lc);
398  Datum values[3];
399  bool nulls[3] = {0};
400 
401  /* Send one datarow message */
402  if (ti->path == NULL)
403  {
404  nulls[0] = true;
405  nulls[1] = true;
406  }
407  else
408  {
409  values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
410  values[1] = CStringGetTextDatum(ti->path);
411  }
412  if (ti->size >= 0)
413  values[2] = Int64GetDatum(ti->size / 1024);
414  else
415  nulls[2] = true;
416 
417  do_tup_output(tstate, values, nulls);
418  }
419 
420  end_tup_output(tstate);
421 }
int16 AttrNumber
Definition: attnum.h:21
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1706
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1585
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1549
bbsink * bbsink_copystream_new(bool send_to_client)
static void SendCopyDone(void)
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static const bbsink_ops bbsink_copystream_ops
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
struct bbsink_copystream bbsink_copystream
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
int64 TimestampTz
Definition: timestamp.h:39
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1794
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
void * palloc0(Size size)
Definition: mcxt.c:1241
void * palloc(Size size)
Definition: mcxt.c:1210
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
static char * buf
Definition: pg_test_fsync.c:67
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:391
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:370
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:198
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
Definition: pg_list.h:54
TimestampTz last_progress_report_time
uint64 bytes_done_at_last_time_check
void(* begin_backup)(bbsink *sink)
bbsink_state * bbs_state
char * bbs_buffer
const bbsink_ops * bbs_ops
size_t bbs_buffer_length
Definition: regguts.h:318
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59