PostgreSQL Source Code  git master
basebackup_copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_copy.c
4  * send basebackup archives using COPY OUT
5  *
6  * We send a result set with information about the tabelspaces to be included
7  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8  * operation and transmits all the archives and the manifest if present during
9  * the course of that single COPY OUT. Each CopyData message begins with a
10  * type byte, allowing us to signal the start of a new archive, or the
11  * manifest, by some means other than ending the COPY stream. This also allows
12  * for future protocol extensions, since we can include arbitrary information
13  * in the message stream as long as we're certain that the client will know
14  * what to do with it.
15  *
16  * An older method that sent each archive using a separate COPY OUT
17  * operation is no longer supported.
18  *
19  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
20  *
21  * IDENTIFICATION
22  * src/backend/replication/basebackup_copy.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres.h"
27 
28 #include "catalog/pg_type_d.h"
29 #include "libpq/libpq.h"
30 #include "libpq/pqformat.h"
31 #include "replication/basebackup.h"
33 #include "utils/timestamp.h"
34 
35 typedef struct bbsink_copystream
36 {
37  /* Common information for all types of sink. */
39 
40  /* Are we sending the archives to the client, or somewhere else? */
42 
43  /*
44  * Protocol message buffer. We assemble CopyData protocol messages by
45  * setting the first character of this buffer to 'd' (archive or manifest
46  * data) and then making base.bbs_buffer point to the second character so
47  * that the rest of the data gets copied into the message just where we
48  * want it.
49  */
50  char *msgbuffer;
51 
52  /*
53  * When did we last report progress to the client, and how much progress
54  * did we report?
55  */
59 
60 /*
61  * We don't want to send progress messages to the client excessively
62  * frequently. Ideally, we'd like to send a message when the time since the
63  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
64  * the system time every time we send a tiny bit of data seems too expensive.
65  * So we only check it after the number of bytes sine the last check reaches
66  * PROGRESS_REPORT_BYTE_INTERVAL.
67  */
68 #define PROGRESS_REPORT_BYTE_INTERVAL 65536
69 #define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
70 
71 static void bbsink_copystream_begin_backup(bbsink *sink);
72 static void bbsink_copystream_begin_archive(bbsink *sink,
73  const char *archive_name);
74 static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
75 static void bbsink_copystream_end_archive(bbsink *sink);
76 static void bbsink_copystream_begin_manifest(bbsink *sink);
77 static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
78 static void bbsink_copystream_end_manifest(bbsink *sink);
79 static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
80  TimeLineID endtli);
81 static void bbsink_copystream_cleanup(bbsink *sink);
82 
83 static void SendCopyOutResponse(void);
84 static void SendCopyDone(void);
85 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
86 static void SendTablespaceList(List *tablespaces);
87 static void send_int8_string(StringInfoData *buf, int64 intval);
88 
91  .begin_archive = bbsink_copystream_begin_archive,
92  .archive_contents = bbsink_copystream_archive_contents,
93  .end_archive = bbsink_copystream_end_archive,
94  .begin_manifest = bbsink_copystream_begin_manifest,
95  .manifest_contents = bbsink_copystream_manifest_contents,
96  .end_manifest = bbsink_copystream_end_manifest,
97  .end_backup = bbsink_copystream_end_backup,
98  .cleanup = bbsink_copystream_cleanup
99 };
100 
101 /*
102  * Create a new 'copystream' bbsink.
103  */
104 bbsink *
105 bbsink_copystream_new(bool send_to_client)
106 {
108 
109  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
110  sink->send_to_client = send_to_client;
111 
112  /* Set up for periodic progress reporting. */
114  sink->bytes_done_at_last_time_check = UINT64CONST(0);
115 
116  return &sink->base;
117 }
118 
119 /*
120  * Send start-of-backup wire protocol messages.
121  */
122 static void
124 {
125  bbsink_copystream *mysink = (bbsink_copystream *) sink;
126  bbsink_state *state = sink->bbs_state;
127  char *buf;
128 
129  /*
130  * Initialize buffer. We ultimately want to send the archive and manifest
131  * data by means of CopyData messages where the payload portion of each
132  * message begins with a type byte. However, basebackup.c expects the
133  * buffer to be aligned, so we can't just allocate one extra byte for the
134  * type byte. Instead, allocate enough extra bytes that the portion of the
135  * buffer we reveal to our callers can be aligned, while leaving room to
136  * slip the type byte in just beforehand. That will allow us to ship the
137  * data with a single call to pq_putmessage and without needing any extra
138  * copying.
139  */
140  buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
141  mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
142  mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
143  mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
144 
145  /* Tell client the backup start location. */
146  SendXlogRecPtrResult(state->startptr, state->starttli);
147 
148  /* Send client a list of tablespaces. */
149  SendTablespaceList(state->tablespaces);
150 
151  /* Send a CommandComplete message */
152  pq_puttextmessage('C', "SELECT");
153 
154  /* Begin COPY stream. This will be used for all archives + manifest. */
156 }
157 
158 /*
159  * Send a CopyData message announcing the beginning of a new archive.
160  */
161 static void
162 bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
163 {
164  bbsink_state *state = sink->bbs_state;
165  tablespaceinfo *ti;
167 
168  ti = list_nth(state->tablespaces, state->tablespace_num);
169  pq_beginmessage(&buf, 'd'); /* CopyData */
170  pq_sendbyte(&buf, 'n'); /* New archive */
171  pq_sendstring(&buf, archive_name);
172  pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
173  pq_endmessage(&buf);
174 }
175 
176 /*
177  * Send a CopyData message containing a chunk of archive content.
178  */
179 static void
181 {
182  bbsink_copystream *mysink = (bbsink_copystream *) sink;
183  bbsink_state *state = mysink->base.bbs_state;
185  uint64 targetbytes;
186 
187  /* Send the archive content to the client, if appropriate. */
188  if (mysink->send_to_client)
189  {
190  /* Add one because we're also sending a leading type byte. */
191  pq_putmessage('d', mysink->msgbuffer, len + 1);
192  }
193 
194  /* Consider whether to send a progress report to the client. */
195  targetbytes = mysink->bytes_done_at_last_time_check
197  if (targetbytes <= state->bytes_done)
198  {
200  long ms;
201 
202  /*
203  * OK, we've sent a decent number of bytes, so check the system time
204  * to see whether we're due to send a progress report.
205  */
206  mysink->bytes_done_at_last_time_check = state->bytes_done;
208  now);
209 
210  /*
211  * Send a progress report if enough time has passed. Also send one if
212  * the system clock was set backward, so that such occurrences don't
213  * have the effect of suppressing further progress messages.
214  */
215  if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
216  {
217  mysink->last_progress_report_time = now;
218 
219  pq_beginmessage(&buf, 'd'); /* CopyData */
220  pq_sendbyte(&buf, 'p'); /* Progress report */
221  pq_sendint64(&buf, state->bytes_done);
222  pq_endmessage(&buf);
224  }
225  }
226 }
227 
228 /*
229  * We don't need to explicitly signal the end of the archive; the client
230  * will figure out that we've reached the end when we begin the next one,
231  * or begin the manifest, or end the COPY stream. However, this seems like
232  * a good time to force out a progress report. One reason for that is that
233  * if this is the last archive, and we don't force a progress report now,
234  * the client will never be told that we sent all the bytes.
235  */
236 static void
238 {
239  bbsink_copystream *mysink = (bbsink_copystream *) sink;
240  bbsink_state *state = mysink->base.bbs_state;
242 
243  mysink->bytes_done_at_last_time_check = state->bytes_done;
245  pq_beginmessage(&buf, 'd'); /* CopyData */
246  pq_sendbyte(&buf, 'p'); /* Progress report */
247  pq_sendint64(&buf, state->bytes_done);
248  pq_endmessage(&buf);
250 }
251 
252 /*
253  * Send a CopyData message announcing the beginning of the backup manifest.
254  */
255 static void
257 {
259 
260  pq_beginmessage(&buf, 'd'); /* CopyData */
261  pq_sendbyte(&buf, 'm'); /* Manifest */
262  pq_endmessage(&buf);
263 }
264 
265 /*
266  * Each chunk of manifest data is sent using a CopyData message.
267  */
268 static void
270 {
271  bbsink_copystream *mysink = (bbsink_copystream *) sink;
272 
273  if (mysink->send_to_client)
274  {
275  /* Add one because we're also sending a leading type byte. */
276  pq_putmessage('d', mysink->msgbuffer, len + 1);
277  }
278 }
279 
280 /*
281  * We don't need an explicit terminator for the backup manifest.
282  */
283 static void
285 {
286  /* Do nothing. */
287 }
288 
289 /*
290  * Send end-of-backup wire protocol messages.
291  */
292 static void
294  TimeLineID endtli)
295 {
296  SendCopyDone();
297  SendXlogRecPtrResult(endptr, endtli);
298 }
299 
300 /*
301  * Cleanup.
302  */
303 static void
305 {
306  /* Nothing to do. */
307 }
308 
309 /*
310  * Send a CopyOutResponse message.
311  */
312 static void
314 {
316 
317  pq_beginmessage(&buf, 'H');
318  pq_sendbyte(&buf, 0); /* overall format */
319  pq_sendint16(&buf, 0); /* natts */
320  pq_endmessage(&buf);
321 }
322 
323 /*
324  * Send a CopyDone message.
325  */
326 static void
328 {
329  pq_putemptymessage('c');
330 }
331 
332 /*
333  * Send a single resultset containing just a single
334  * XLogRecPtr record (in text format)
335  */
336 static void
338 {
340  char str[MAXFNAMELEN];
341  Size len;
342 
343  pq_beginmessage(&buf, 'T'); /* RowDescription */
344  pq_sendint16(&buf, 2); /* 2 fields */
345 
346  /* Field headers */
347  pq_sendstring(&buf, "recptr");
348  pq_sendint32(&buf, 0); /* table oid */
349  pq_sendint16(&buf, 0); /* attnum */
350  pq_sendint32(&buf, TEXTOID); /* type oid */
351  pq_sendint16(&buf, -1);
352  pq_sendint32(&buf, 0);
353  pq_sendint16(&buf, 0);
354 
355  pq_sendstring(&buf, "tli");
356  pq_sendint32(&buf, 0); /* table oid */
357  pq_sendint16(&buf, 0); /* attnum */
358 
359  /*
360  * int8 may seem like a surprising data type for this, but in theory int4
361  * would not be wide enough for this, as TimeLineID is unsigned.
362  */
363  pq_sendint32(&buf, INT8OID); /* type oid */
364  pq_sendint16(&buf, -1);
365  pq_sendint32(&buf, 0);
366  pq_sendint16(&buf, 0);
367  pq_endmessage(&buf);
368 
369  /* Data row */
370  pq_beginmessage(&buf, 'D');
371  pq_sendint16(&buf, 2); /* number of columns */
372 
373  len = snprintf(str, sizeof(str),
374  "%X/%X", LSN_FORMAT_ARGS(ptr));
375  pq_sendint32(&buf, len);
376  pq_sendbytes(&buf, str, len);
377 
378  len = snprintf(str, sizeof(str), "%u", tli);
379  pq_sendint32(&buf, len);
380  pq_sendbytes(&buf, str, len);
381 
382  pq_endmessage(&buf);
383 
384  /* Send a CommandComplete message */
385  pq_puttextmessage('C', "SELECT");
386 }
387 
388 /*
389  * Send a result set via libpq describing the tablespace list.
390  */
391 static void
393 {
395  ListCell *lc;
396 
397  /* Construct and send the directory information */
398  pq_beginmessage(&buf, 'T'); /* RowDescription */
399  pq_sendint16(&buf, 3); /* 3 fields */
400 
401  /* First field - spcoid */
402  pq_sendstring(&buf, "spcoid");
403  pq_sendint32(&buf, 0); /* table oid */
404  pq_sendint16(&buf, 0); /* attnum */
405  pq_sendint32(&buf, OIDOID); /* type oid */
406  pq_sendint16(&buf, 4); /* typlen */
407  pq_sendint32(&buf, 0); /* typmod */
408  pq_sendint16(&buf, 0); /* format code */
409 
410  /* Second field - spclocation */
411  pq_sendstring(&buf, "spclocation");
412  pq_sendint32(&buf, 0);
413  pq_sendint16(&buf, 0);
414  pq_sendint32(&buf, TEXTOID);
415  pq_sendint16(&buf, -1);
416  pq_sendint32(&buf, 0);
417  pq_sendint16(&buf, 0);
418 
419  /* Third field - size */
420  pq_sendstring(&buf, "size");
421  pq_sendint32(&buf, 0);
422  pq_sendint16(&buf, 0);
423  pq_sendint32(&buf, INT8OID);
424  pq_sendint16(&buf, 8);
425  pq_sendint32(&buf, 0);
426  pq_sendint16(&buf, 0);
427  pq_endmessage(&buf);
428 
429  foreach(lc, tablespaces)
430  {
431  tablespaceinfo *ti = lfirst(lc);
432 
433  /* Send one datarow message */
434  pq_beginmessage(&buf, 'D');
435  pq_sendint16(&buf, 3); /* number of columns */
436  if (ti->path == NULL)
437  {
438  pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
439  pq_sendint32(&buf, -1);
440  }
441  else
442  {
443  Size len;
444 
445  len = strlen(ti->oid);
446  pq_sendint32(&buf, len);
447  pq_sendbytes(&buf, ti->oid, len);
448 
449  len = strlen(ti->path);
450  pq_sendint32(&buf, len);
451  pq_sendbytes(&buf, ti->path, len);
452  }
453  if (ti->size >= 0)
454  send_int8_string(&buf, ti->size / 1024);
455  else
456  pq_sendint32(&buf, -1); /* NULL */
457 
458  pq_endmessage(&buf);
459  }
460 }
461 
462 /*
463  * Send a 64-bit integer as a string via the wire protocol.
464  */
465 static void
467 {
468  char is[32];
469 
470  sprintf(is, INT64_FORMAT, intval);
471  pq_sendint32(buf, strlen(is));
472  pq_sendbytes(buf, is, strlen(is));
473 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1687
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
bbsink * bbsink_copystream_new(bool send_to_client)
static void SendCopyDone(void)
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void send_int8_string(StringInfoData *buf, int64 intval)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static const bbsink_ops bbsink_copystream_ops
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
struct bbsink_copystream bbsink_copystream
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)
#define INT64_FORMAT
Definition: c.h:483
size_t Size
Definition: c.h:540
int64 TimestampTz
Definition: timestamp.h:39
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
void * palloc0(Size size)
Definition: mcxt.c:1099
void * palloc(Size size)
Definition: mcxt.c:1068
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:169
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
static char * buf
Definition: pg_test_fsync.c:67
#define sprintf
Definition: port.h:227
#define snprintf
Definition: port.h:225
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
Definition: pg_list.h:51
TimestampTz last_progress_report_time
uint64 bytes_done_at_last_time_check
void(* begin_backup)(bbsink *sink)
bbsink_state * bbs_state
char * bbs_buffer
const bbsink_ops * bbs_ops
size_t bbs_buffer_length
Definition: regguts.h:318
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59