PostgreSQL Source Code  git master
basebackup_copy.c File Reference
#include "postgres.h"
#include "access/tupdesc.h"
#include "backup/basebackup.h"
#include "backup/basebackup_sink.h"
#include "catalog/pg_type_d.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "tcop/dest.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
Include dependency graph for basebackup_copy.c:

Go to the source code of this file.

Data Structures

struct  bbsink_copystream
 

Macros

#define PROGRESS_REPORT_BYTE_INTERVAL   65536
 
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000
 

Typedefs

typedef struct bbsink_copystream bbsink_copystream
 

Functions

static void bbsink_copystream_begin_backup (bbsink *sink)
 
static void bbsink_copystream_begin_archive (bbsink *sink, const char *archive_name)
 
static void bbsink_copystream_archive_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_archive (bbsink *sink)
 
static void bbsink_copystream_begin_manifest (bbsink *sink)
 
static void bbsink_copystream_manifest_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_manifest (bbsink *sink)
 
static void bbsink_copystream_end_backup (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
 
static void bbsink_copystream_cleanup (bbsink *sink)
 
static void SendCopyOutResponse (void)
 
static void SendCopyDone (void)
 
static void SendXlogRecPtrResult (XLogRecPtr ptr, TimeLineID tli)
 
static void SendTablespaceList (List *tablespaces)
 
bbsinkbbsink_copystream_new (bool send_to_client)
 

Variables

static const bbsink_ops bbsink_copystream_ops
 

Macro Definition Documentation

◆ PROGRESS_REPORT_BYTE_INTERVAL

#define PROGRESS_REPORT_BYTE_INTERVAL   65536

Definition at line 72 of file basebackup_copy.c.

◆ PROGRESS_REPORT_MILLISECOND_THRESHOLD

#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000

Definition at line 73 of file basebackup_copy.c.

Typedef Documentation

◆ bbsink_copystream

Function Documentation

◆ bbsink_copystream_archive_contents()

static void bbsink_copystream_archive_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 183 of file basebackup_copy.c.

184 {
185  bbsink_copystream *mysink = (bbsink_copystream *) sink;
186  bbsink_state *state = mysink->base.bbs_state;
188  uint64 targetbytes;
189 
190  /* Send the archive content to the client, if appropriate. */
191  if (mysink->send_to_client)
192  {
193  /* Add one because we're also sending a leading type byte. */
194  pq_putmessage('d', mysink->msgbuffer, len + 1);
195  }
196 
197  /* Consider whether to send a progress report to the client. */
198  targetbytes = mysink->bytes_done_at_last_time_check
200  if (targetbytes <= state->bytes_done)
201  {
203  long ms;
204 
205  /*
206  * OK, we've sent a decent number of bytes, so check the system time
207  * to see whether we're due to send a progress report.
208  */
209  mysink->bytes_done_at_last_time_check = state->bytes_done;
211  now);
212 
213  /*
214  * Send a progress report if enough time has passed. Also send one if
215  * the system clock was set backward, so that such occurrences don't
216  * have the effect of suppressing further progress messages.
217  */
218  if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
219  {
220  mysink->last_progress_report_time = now;
221 
222  pq_beginmessage(&buf, 'd'); /* CopyData */
223  pq_sendbyte(&buf, 'p'); /* Progress report */
224  pq_sendint64(&buf, state->bytes_done);
225  pq_endmessage(&buf);
227  }
228  }
229 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1687
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
int64 TimestampTz
Definition: timestamp.h:39
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
const void size_t len
static char * buf
Definition: pg_test_fsync.c:67
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
TimestampTz last_progress_report_time
uint64 bytes_done_at_last_time_check
bbsink_state * bbs_state
Definition: regguts.h:318

References bbsink_copystream::base, bbsink::bbs_state, buf, bbsink_copystream::bytes_done_at_last_time_check, GetCurrentTimestamp(), bbsink_copystream::last_progress_report_time, len, bbsink_copystream::msgbuffer, now(), pq_beginmessage(), pq_endmessage(), pq_flush_if_writable, pq_putmessage, pq_sendbyte(), pq_sendint64(), PROGRESS_REPORT_BYTE_INTERVAL, PROGRESS_REPORT_MILLISECOND_THRESHOLD, bbsink_copystream::send_to_client, and TimestampDifferenceMilliseconds().

◆ bbsink_copystream_begin_archive()

static void bbsink_copystream_begin_archive ( bbsink sink,
const char *  archive_name 
)
static

Definition at line 165 of file basebackup_copy.c.

166 {
167  bbsink_state *state = sink->bbs_state;
168  tablespaceinfo *ti;
170 
171  ti = list_nth(state->tablespaces, state->tablespace_num);
172  pq_beginmessage(&buf, 'd'); /* CopyData */
173  pq_sendbyte(&buf, 'n'); /* New archive */
174  pq_sendstring(&buf, archive_name);
175  pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
176  pq_endmessage(&buf);
177 }
static void * list_nth(const List *list, int n)
Definition: pg_list.h:297
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197

References bbsink::bbs_state, buf, list_nth(), tablespaceinfo::path, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), and pq_sendstring().

◆ bbsink_copystream_begin_backup()

static void bbsink_copystream_begin_backup ( bbsink sink)
static

Definition at line 126 of file basebackup_copy.c.

127 {
128  bbsink_copystream *mysink = (bbsink_copystream *) sink;
129  bbsink_state *state = sink->bbs_state;
130  char *buf;
131 
132  /*
133  * Initialize buffer. We ultimately want to send the archive and manifest
134  * data by means of CopyData messages where the payload portion of each
135  * message begins with a type byte. However, basebackup.c expects the
136  * buffer to be aligned, so we can't just allocate one extra byte for the
137  * type byte. Instead, allocate enough extra bytes that the portion of the
138  * buffer we reveal to our callers can be aligned, while leaving room to
139  * slip the type byte in just beforehand. That will allow us to ship the
140  * data with a single call to pq_putmessage and without needing any extra
141  * copying.
142  */
143  buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144  mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145  mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
146  mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
147 
148  /* Tell client the backup start location. */
149  SendXlogRecPtrResult(state->startptr, state->starttli);
150 
151  /* Send client a list of tablespaces. */
152  SendTablespaceList(state->tablespaces);
153 
154  /* Send a CommandComplete message */
155  pq_puttextmessage('C', "SELECT");
156 
157  /* Begin COPY stream. This will be used for all archives + manifest. */
159 }
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
void * palloc(Size size)
Definition: mcxt.c:1145
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
char * bbs_buffer
size_t bbs_buffer_length

References bbsink_copystream::base, bbsink::bbs_buffer, bbsink::bbs_buffer_length, bbsink::bbs_state, buf, bbsink_copystream::msgbuffer, palloc(), pq_puttextmessage(), SendCopyOutResponse(), SendTablespaceList(), and SendXlogRecPtrResult().

◆ bbsink_copystream_begin_manifest()

static void bbsink_copystream_begin_manifest ( bbsink sink)
static

Definition at line 259 of file basebackup_copy.c.

260 {
262 
263  pq_beginmessage(&buf, 'd'); /* CopyData */
264  pq_sendbyte(&buf, 'm'); /* Manifest */
265  pq_endmessage(&buf);
266 }

References buf, pq_beginmessage(), pq_endmessage(), and pq_sendbyte().

◆ bbsink_copystream_cleanup()

static void bbsink_copystream_cleanup ( bbsink sink)
static

Definition at line 307 of file basebackup_copy.c.

308 {
309  /* Nothing to do. */
310 }

◆ bbsink_copystream_end_archive()

static void bbsink_copystream_end_archive ( bbsink sink)
static

◆ bbsink_copystream_end_backup()

static void bbsink_copystream_end_backup ( bbsink sink,
XLogRecPtr  endptr,
TimeLineID  endtli 
)
static

Definition at line 296 of file basebackup_copy.c.

298 {
299  SendCopyDone();
300  SendXlogRecPtrResult(endptr, endtli);
301 }
static void SendCopyDone(void)

References SendCopyDone(), and SendXlogRecPtrResult().

◆ bbsink_copystream_end_manifest()

static void bbsink_copystream_end_manifest ( bbsink sink)
static

Definition at line 287 of file basebackup_copy.c.

288 {
289  /* Do nothing. */
290 }

◆ bbsink_copystream_manifest_contents()

static void bbsink_copystream_manifest_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 272 of file basebackup_copy.c.

273 {
274  bbsink_copystream *mysink = (bbsink_copystream *) sink;
275 
276  if (mysink->send_to_client)
277  {
278  /* Add one because we're also sending a leading type byte. */
279  pq_putmessage('d', mysink->msgbuffer, len + 1);
280  }
281 }

References len, bbsink_copystream::msgbuffer, pq_putmessage, and bbsink_copystream::send_to_client.

◆ bbsink_copystream_new()

bbsink* bbsink_copystream_new ( bool  send_to_client)

Definition at line 108 of file basebackup_copy.c.

109 {
111 
112  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
113  sink->send_to_client = send_to_client;
114 
115  /* Set up for periodic progress reporting. */
117  sink->bytes_done_at_last_time_check = UINT64CONST(0);
118 
119  return &sink->base;
120 }
static const bbsink_ops bbsink_copystream_ops
void * palloc0(Size size)
Definition: mcxt.c:1176
const bbsink_ops * bbs_ops

References bbsink_copystream::base, bbsink::bbs_ops, bbsink_copystream_ops, bbsink_copystream::bytes_done_at_last_time_check, GetCurrentTimestamp(), bbsink_copystream::last_progress_report_time, palloc0(), and bbsink_copystream::send_to_client.

Referenced by SendBaseBackup().

◆ SendCopyDone()

static void SendCopyDone ( void  )
static

Definition at line 330 of file basebackup_copy.c.

331 {
332  pq_putemptymessage('c');
333 }
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390

References pq_putemptymessage().

Referenced by bbsink_copystream_end_backup().

◆ SendCopyOutResponse()

static void SendCopyOutResponse ( void  )
static

Definition at line 316 of file basebackup_copy.c.

317 {
319 
320  pq_beginmessage(&buf, 'H');
321  pq_sendbyte(&buf, 0); /* overall format */
322  pq_sendint16(&buf, 0); /* natts */
323  pq_endmessage(&buf);
324 }
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137

References buf, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), and pq_sendint16().

Referenced by bbsink_copystream_begin_backup().

◆ SendTablespaceList()

static void SendTablespaceList ( List tablespaces)
static

Definition at line 376 of file basebackup_copy.c.

377 {
379  TupOutputState *tstate;
380  TupleDesc tupdesc;
381  ListCell *lc;
382 
384 
385  tupdesc = CreateTemplateTupleDesc(3);
386  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
387  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
388  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
389 
390  /* send RowDescription */
391  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
392 
393  /* Construct and send the directory information */
394  foreach(lc, tablespaces)
395  {
396  tablespaceinfo *ti = lfirst(lc);
397  Datum values[3];
398  bool nulls[3] = {0};
399 
400  /* Send one datarow message */
401  if (ti->path == NULL)
402  {
403  nulls[0] = true;
404  nulls[1] = true;
405  }
406  else
407  {
408  values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
409  values[1] = CStringGetTextDatum(ti->path);
410  }
411  if (ti->size >= 0)
412  values[2] = Int64GetDatum(ti->size / 1024);
413  else
414  nulls[2] = true;
415 
416  do_tup_output(tstate, values, nulls);
417  }
418 
419  end_tup_output(tstate);
420 }
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:93
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
#define lfirst(lc)
Definition: pg_list.h:170
uintptr_t Datum
Definition: postgres.h:412
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657

References begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), Int64GetDatum(), lfirst, ObjectIdGetDatum(), tablespaceinfo::oid, tablespaceinfo::path, tablespaceinfo::size, TTSOpsVirtual, TupleDescInitBuiltinEntry(), and values.

Referenced by bbsink_copystream_begin_backup().

◆ SendXlogRecPtrResult()

static void SendXlogRecPtrResult ( XLogRecPtr  ptr,
TimeLineID  tli 
)
static

Definition at line 340 of file basebackup_copy.c.

341 {
343  TupOutputState *tstate;
344  TupleDesc tupdesc;
345  Datum values[2];
346  bool nulls[2] = {0};
347 
349 
350  tupdesc = CreateTemplateTupleDesc(2);
351  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
352  /*
353  * int8 may seem like a surprising data type for this, but in theory int4
354  * would not be wide enough for this, as TimeLineID is unsigned.
355  */
356  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
357 
358  /* send RowDescription */
359  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
360 
361  /* Data row */
363  values[1] = Int64GetDatum(tli);
364  do_tup_output(tstate, values, nulls);
365 
366  end_tup_output(tstate);
367 
368  /* Send a CommandComplete message */
369  pq_puttextmessage('C', "SELECT");
370 }
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), Int64GetDatum(), LSN_FORMAT_ARGS, pq_puttextmessage(), psprintf(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), and values.

Referenced by bbsink_copystream_begin_backup(), and bbsink_copystream_end_backup().

Variable Documentation

◆ bbsink_copystream_ops

const bbsink_ops bbsink_copystream_ops
static
Initial value:
= {
}
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)

Definition at line 92 of file basebackup_copy.c.

Referenced by bbsink_copystream_new().