PostgreSQL Source Code  git master
basebackup_copy.c File Reference
#include "postgres.h"
#include "catalog/pg_type_d.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
#include "utils/timestamp.h"
Include dependency graph for basebackup_copy.c:

Go to the source code of this file.

Data Structures

struct  bbsink_copystream
 

Macros

#define PROGRESS_REPORT_BYTE_INTERVAL   65536
 
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000
 

Typedefs

typedef struct bbsink_copystream bbsink_copystream
 

Functions

static void bbsink_copystream_begin_backup (bbsink *sink)
 
static void bbsink_copystream_begin_archive (bbsink *sink, const char *archive_name)
 
static void bbsink_copystream_archive_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_archive (bbsink *sink)
 
static void bbsink_copystream_begin_manifest (bbsink *sink)
 
static void bbsink_copystream_manifest_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_manifest (bbsink *sink)
 
static void bbsink_copystream_end_backup (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
 
static void bbsink_copystream_cleanup (bbsink *sink)
 
static void SendCopyOutResponse (void)
 
static void SendCopyDone (void)
 
static void SendXlogRecPtrResult (XLogRecPtr ptr, TimeLineID tli)
 
static void SendTablespaceList (List *tablespaces)
 
static void send_int8_string (StringInfoData *buf, int64 intval)
 
bbsinkbbsink_copystream_new (bool send_to_client)
 

Variables

static const bbsink_ops bbsink_copystream_ops
 

Macro Definition Documentation

◆ PROGRESS_REPORT_BYTE_INTERVAL

#define PROGRESS_REPORT_BYTE_INTERVAL   65536

Definition at line 68 of file basebackup_copy.c.

◆ PROGRESS_REPORT_MILLISECOND_THRESHOLD

#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000

Definition at line 69 of file basebackup_copy.c.

Typedef Documentation

◆ bbsink_copystream

Function Documentation

◆ bbsink_copystream_archive_contents()

static void bbsink_copystream_archive_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 180 of file basebackup_copy.c.

181 {
182  bbsink_copystream *mysink = (bbsink_copystream *) sink;
183  bbsink_state *state = mysink->base.bbs_state;
185  uint64 targetbytes;
186 
187  /* Send the archive content to the client, if appropriate. */
188  if (mysink->send_to_client)
189  {
190  /* Add one because we're also sending a leading type byte. */
191  pq_putmessage('d', mysink->msgbuffer, len + 1);
192  }
193 
194  /* Consider whether to send a progress report to the client. */
195  targetbytes = mysink->bytes_done_at_last_time_check
197  if (targetbytes <= state->bytes_done)
198  {
200  long ms;
201 
202  /*
203  * OK, we've sent a decent number of bytes, so check the system time
204  * to see whether we're due to send a progress report.
205  */
206  mysink->bytes_done_at_last_time_check = state->bytes_done;
208  now);
209 
210  /*
211  * Send a progress report if enough time has passed. Also send one if
212  * the system clock was set backward, so that such occurrences don't
213  * have the effect of suppressing further progress messages.
214  */
215  if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
216  {
217  mysink->last_progress_report_time = now;
218 
219  pq_beginmessage(&buf, 'd'); /* CopyData */
220  pq_sendbyte(&buf, 'p'); /* Progress report */
221  pq_sendint64(&buf, state->bytes_done);
222  pq_endmessage(&buf);
224  }
225  }
226 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1687
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
int64 TimestampTz
Definition: timestamp.h:39
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
TimestampTz last_progress_report_time
uint64 bytes_done_at_last_time_check
bbsink_state * bbs_state
Definition: regguts.h:318

References bbsink_copystream::base, bbsink::bbs_state, buf, bbsink_copystream::bytes_done_at_last_time_check, GetCurrentTimestamp(), bbsink_copystream::last_progress_report_time, len, bbsink_copystream::msgbuffer, now(), pq_beginmessage(), pq_endmessage(), pq_flush_if_writable, pq_putmessage, pq_sendbyte(), pq_sendint64(), PROGRESS_REPORT_BYTE_INTERVAL, PROGRESS_REPORT_MILLISECOND_THRESHOLD, bbsink_copystream::send_to_client, and TimestampDifferenceMilliseconds().

◆ bbsink_copystream_begin_archive()

static void bbsink_copystream_begin_archive ( bbsink sink,
const char *  archive_name 
)
static

Definition at line 162 of file basebackup_copy.c.

163 {
164  bbsink_state *state = sink->bbs_state;
165  tablespaceinfo *ti;
167 
168  ti = list_nth(state->tablespaces, state->tablespace_num);
169  pq_beginmessage(&buf, 'd'); /* CopyData */
170  pq_sendbyte(&buf, 'n'); /* New archive */
171  pq_sendstring(&buf, archive_name);
172  pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
173  pq_endmessage(&buf);
174 }
static void * list_nth(const List *list, int n)
Definition: pg_list.h:278
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197

References bbsink::bbs_state, buf, list_nth(), tablespaceinfo::path, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), and pq_sendstring().

◆ bbsink_copystream_begin_backup()

static void bbsink_copystream_begin_backup ( bbsink sink)
static

Definition at line 123 of file basebackup_copy.c.

124 {
125  bbsink_copystream *mysink = (bbsink_copystream *) sink;
126  bbsink_state *state = sink->bbs_state;
127  char *buf;
128 
129  /*
130  * Initialize buffer. We ultimately want to send the archive and manifest
131  * data by means of CopyData messages where the payload portion of each
132  * message begins with a type byte. However, basebackup.c expects the
133  * buffer to be aligned, so we can't just allocate one extra byte for the
134  * type byte. Instead, allocate enough extra bytes that the portion of the
135  * buffer we reveal to our callers can be aligned, while leaving room to
136  * slip the type byte in just beforehand. That will allow us to ship the
137  * data with a single call to pq_putmessage and without needing any extra
138  * copying.
139  */
140  buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
141  mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
142  mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
143  mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
144 
145  /* Tell client the backup start location. */
146  SendXlogRecPtrResult(state->startptr, state->starttli);
147 
148  /* Send client a list of tablespaces. */
149  SendTablespaceList(state->tablespaces);
150 
151  /* Send a CommandComplete message */
152  pq_puttextmessage('C', "SELECT");
153 
154  /* Begin COPY stream. This will be used for all archives + manifest. */
156 }
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
void * palloc(Size size)
Definition: mcxt.c:1068
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
char * bbs_buffer
size_t bbs_buffer_length

References bbsink_copystream::base, bbsink::bbs_buffer, bbsink::bbs_buffer_length, bbsink::bbs_state, buf, bbsink_copystream::msgbuffer, palloc(), pq_puttextmessage(), SendCopyOutResponse(), SendTablespaceList(), and SendXlogRecPtrResult().

◆ bbsink_copystream_begin_manifest()

static void bbsink_copystream_begin_manifest ( bbsink sink)
static

Definition at line 256 of file basebackup_copy.c.

257 {
259 
260  pq_beginmessage(&buf, 'd'); /* CopyData */
261  pq_sendbyte(&buf, 'm'); /* Manifest */
262  pq_endmessage(&buf);
263 }

References buf, pq_beginmessage(), pq_endmessage(), and pq_sendbyte().

◆ bbsink_copystream_cleanup()

static void bbsink_copystream_cleanup ( bbsink sink)
static

Definition at line 304 of file basebackup_copy.c.

305 {
306  /* Nothing to do. */
307 }

◆ bbsink_copystream_end_archive()

static void bbsink_copystream_end_archive ( bbsink sink)
static

◆ bbsink_copystream_end_backup()

static void bbsink_copystream_end_backup ( bbsink sink,
XLogRecPtr  endptr,
TimeLineID  endtli 
)
static

Definition at line 293 of file basebackup_copy.c.

295 {
296  SendCopyDone();
297  SendXlogRecPtrResult(endptr, endtli);
298 }
static void SendCopyDone(void)

References SendCopyDone(), and SendXlogRecPtrResult().

◆ bbsink_copystream_end_manifest()

static void bbsink_copystream_end_manifest ( bbsink sink)
static

Definition at line 284 of file basebackup_copy.c.

285 {
286  /* Do nothing. */
287 }

◆ bbsink_copystream_manifest_contents()

static void bbsink_copystream_manifest_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 269 of file basebackup_copy.c.

270 {
271  bbsink_copystream *mysink = (bbsink_copystream *) sink;
272 
273  if (mysink->send_to_client)
274  {
275  /* Add one because we're also sending a leading type byte. */
276  pq_putmessage('d', mysink->msgbuffer, len + 1);
277  }
278 }

References len, bbsink_copystream::msgbuffer, pq_putmessage, and bbsink_copystream::send_to_client.

◆ bbsink_copystream_new()

bbsink* bbsink_copystream_new ( bool  send_to_client)

Definition at line 105 of file basebackup_copy.c.

106 {
108 
109  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
110  sink->send_to_client = send_to_client;
111 
112  /* Set up for periodic progress reporting. */
114  sink->bytes_done_at_last_time_check = UINT64CONST(0);
115 
116  return &sink->base;
117 }
static const bbsink_ops bbsink_copystream_ops
void * palloc0(Size size)
Definition: mcxt.c:1099
const bbsink_ops * bbs_ops

References bbsink_copystream::base, bbsink::bbs_ops, bbsink_copystream_ops, bbsink_copystream::bytes_done_at_last_time_check, GetCurrentTimestamp(), bbsink_copystream::last_progress_report_time, palloc0(), and bbsink_copystream::send_to_client.

Referenced by SendBaseBackup().

◆ send_int8_string()

static void send_int8_string ( StringInfoData buf,
int64  intval 
)
static

Definition at line 466 of file basebackup_copy.c.

467 {
468  char is[32];
469 
470  sprintf(is, INT64_FORMAT, intval);
471  pq_sendint32(buf, strlen(is));
472  pq_sendbytes(buf, is, strlen(is));
473 }
#define INT64_FORMAT
Definition: c.h:483
#define sprintf
Definition: port.h:227
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145

References buf, INT64_FORMAT, pq_sendbytes(), pq_sendint32(), and sprintf.

Referenced by SendTablespaceList().

◆ SendCopyDone()

static void SendCopyDone ( void  )
static

Definition at line 327 of file basebackup_copy.c.

328 {
329  pq_putemptymessage('c');
330 }
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390

References pq_putemptymessage().

Referenced by bbsink_copystream_end_backup().

◆ SendCopyOutResponse()

static void SendCopyOutResponse ( void  )
static

Definition at line 313 of file basebackup_copy.c.

314 {
316 
317  pq_beginmessage(&buf, 'H');
318  pq_sendbyte(&buf, 0); /* overall format */
319  pq_sendint16(&buf, 0); /* natts */
320  pq_endmessage(&buf);
321 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137

References buf, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), and pq_sendint16().

Referenced by bbsink_copystream_begin_backup().

◆ SendTablespaceList()

static void SendTablespaceList ( List tablespaces)
static

Definition at line 392 of file basebackup_copy.c.

393 {
395  ListCell *lc;
396 
397  /* Construct and send the directory information */
398  pq_beginmessage(&buf, 'T'); /* RowDescription */
399  pq_sendint16(&buf, 3); /* 3 fields */
400 
401  /* First field - spcoid */
402  pq_sendstring(&buf, "spcoid");
403  pq_sendint32(&buf, 0); /* table oid */
404  pq_sendint16(&buf, 0); /* attnum */
405  pq_sendint32(&buf, OIDOID); /* type oid */
406  pq_sendint16(&buf, 4); /* typlen */
407  pq_sendint32(&buf, 0); /* typmod */
408  pq_sendint16(&buf, 0); /* format code */
409 
410  /* Second field - spclocation */
411  pq_sendstring(&buf, "spclocation");
412  pq_sendint32(&buf, 0);
413  pq_sendint16(&buf, 0);
414  pq_sendint32(&buf, TEXTOID);
415  pq_sendint16(&buf, -1);
416  pq_sendint32(&buf, 0);
417  pq_sendint16(&buf, 0);
418 
419  /* Third field - size */
420  pq_sendstring(&buf, "size");
421  pq_sendint32(&buf, 0);
422  pq_sendint16(&buf, 0);
423  pq_sendint32(&buf, INT8OID);
424  pq_sendint16(&buf, 8);
425  pq_sendint32(&buf, 0);
426  pq_sendint16(&buf, 0);
427  pq_endmessage(&buf);
428 
429  foreach(lc, tablespaces)
430  {
431  tablespaceinfo *ti = lfirst(lc);
432 
433  /* Send one datarow message */
434  pq_beginmessage(&buf, 'D');
435  pq_sendint16(&buf, 3); /* number of columns */
436  if (ti->path == NULL)
437  {
438  pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
439  pq_sendint32(&buf, -1);
440  }
441  else
442  {
443  Size len;
444 
445  len = strlen(ti->oid);
446  pq_sendint32(&buf, len);
447  pq_sendbytes(&buf, ti->oid, len);
448 
449  len = strlen(ti->path);
450  pq_sendint32(&buf, len);
451  pq_sendbytes(&buf, ti->path, len);
452  }
453  if (ti->size >= 0)
454  send_int8_string(&buf, ti->size / 1024);
455  else
456  pq_sendint32(&buf, -1); /* NULL */
457 
458  pq_endmessage(&buf);
459  }
460 }
static void send_int8_string(StringInfoData *buf, int64 intval)
size_t Size
Definition: c.h:540
#define lfirst(lc)
Definition: pg_list.h:169

References buf, len, lfirst, tablespaceinfo::oid, tablespaceinfo::path, pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), send_int8_string(), and tablespaceinfo::size.

Referenced by bbsink_copystream_begin_backup().

◆ SendXlogRecPtrResult()

static void SendXlogRecPtrResult ( XLogRecPtr  ptr,
TimeLineID  tli 
)
static

Definition at line 337 of file basebackup_copy.c.

338 {
340  char str[MAXFNAMELEN];
341  Size len;
342 
343  pq_beginmessage(&buf, 'T'); /* RowDescription */
344  pq_sendint16(&buf, 2); /* 2 fields */
345 
346  /* Field headers */
347  pq_sendstring(&buf, "recptr");
348  pq_sendint32(&buf, 0); /* table oid */
349  pq_sendint16(&buf, 0); /* attnum */
350  pq_sendint32(&buf, TEXTOID); /* type oid */
351  pq_sendint16(&buf, -1);
352  pq_sendint32(&buf, 0);
353  pq_sendint16(&buf, 0);
354 
355  pq_sendstring(&buf, "tli");
356  pq_sendint32(&buf, 0); /* table oid */
357  pq_sendint16(&buf, 0); /* attnum */
358 
359  /*
360  * int8 may seem like a surprising data type for this, but in theory int4
361  * would not be wide enough for this, as TimeLineID is unsigned.
362  */
363  pq_sendint32(&buf, INT8OID); /* type oid */
364  pq_sendint16(&buf, -1);
365  pq_sendint32(&buf, 0);
366  pq_sendint16(&buf, 0);
367  pq_endmessage(&buf);
368 
369  /* Data row */
370  pq_beginmessage(&buf, 'D');
371  pq_sendint16(&buf, 2); /* number of columns */
372 
373  len = snprintf(str, sizeof(str),
374  "%X/%X", LSN_FORMAT_ARGS(ptr));
375  pq_sendint32(&buf, len);
376  pq_sendbytes(&buf, str, len);
377 
378  len = snprintf(str, sizeof(str), "%u", tli);
379  pq_sendint32(&buf, len);
380  pq_sendbytes(&buf, str, len);
381 
382  pq_endmessage(&buf);
383 
384  /* Send a CommandComplete message */
385  pq_puttextmessage('C', "SELECT");
386 }
#define snprintf
Definition: port.h:225
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References buf, len, LSN_FORMAT_ARGS, MAXFNAMELEN, pq_beginmessage(), pq_endmessage(), pq_puttextmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), pq_sendstring(), snprintf, and generate_unaccent_rules::str.

Referenced by bbsink_copystream_begin_backup(), and bbsink_copystream_end_backup().

Variable Documentation

◆ bbsink_copystream_ops

const bbsink_ops bbsink_copystream_ops
static
Initial value:
= {
}
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)

Definition at line 89 of file basebackup_copy.c.

Referenced by bbsink_copystream_new().