PostgreSQL Source Code git master
Loading...
Searching...
No Matches
basebackup_copy.c File Reference
#include "postgres.h"
#include "access/tupdesc.h"
#include "backup/basebackup.h"
#include "backup/basebackup_sink.h"
#include "catalog/pg_type_d.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "tcop/dest.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
Include dependency graph for basebackup_copy.c:

Go to the source code of this file.

Data Structures

struct  bbsink_copystream
 

Macros

#define PROGRESS_REPORT_BYTE_INTERVAL   65536
 
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000
 

Typedefs

typedef struct bbsink_copystream bbsink_copystream
 

Functions

static void bbsink_copystream_begin_backup (bbsink *sink)
 
static void bbsink_copystream_begin_archive (bbsink *sink, const char *archive_name)
 
static void bbsink_copystream_archive_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_archive (bbsink *sink)
 
static void bbsink_copystream_begin_manifest (bbsink *sink)
 
static void bbsink_copystream_manifest_contents (bbsink *sink, size_t len)
 
static void bbsink_copystream_end_manifest (bbsink *sink)
 
static void bbsink_copystream_end_backup (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
 
static void bbsink_copystream_cleanup (bbsink *sink)
 
static void SendCopyOutResponse (void)
 
static void SendCopyDone (void)
 
static void SendXlogRecPtrResult (XLogRecPtr ptr, TimeLineID tli)
 
static void SendTablespaceList (List *tablespaces)
 
bbsinkbbsink_copystream_new (bool send_to_client)
 

Variables

static const bbsink_ops bbsink_copystream_ops
 

Macro Definition Documentation

◆ PROGRESS_REPORT_BYTE_INTERVAL

#define PROGRESS_REPORT_BYTE_INTERVAL   65536

Definition at line 72 of file basebackup_copy.c.

◆ PROGRESS_REPORT_MILLISECOND_THRESHOLD

#define PROGRESS_REPORT_MILLISECOND_THRESHOLD   1000

Definition at line 73 of file basebackup_copy.c.

Typedef Documentation

◆ bbsink_copystream

Function Documentation

◆ bbsink_copystream_archive_contents()

static void bbsink_copystream_archive_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 183 of file basebackup_copy.c.

184{
186 bbsink_state *state = mysink->base.bbs_state;
189
190 /* Send the archive content to the client, if appropriate. */
191 if (mysink->send_to_client)
192 {
193 /* Add one because we're also sending a leading type byte. */
194 pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
195 }
196
197 /* Consider whether to send a progress report to the client. */
198 targetbytes = mysink->bytes_done_at_last_time_check
200 if (targetbytes <= state->bytes_done)
201 {
203 long ms;
204
205 /*
206 * OK, we've sent a decent number of bytes, so check the system time
207 * to see whether we're due to send a progress report.
208 */
209 mysink->bytes_done_at_last_time_check = state->bytes_done;
210 ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
211 now);
212
213 /*
214 * Send a progress report if enough time has passed. Also send one if
215 * the system clock was set backward, so that such occurrences don't
216 * have the effect of suppressing further progress messages.
217 */
219 now < mysink->last_progress_report_time)
220 {
221 mysink->last_progress_report_time = now;
222
225 pq_sendint64(&buf, state->bytes_done);
228 }
229 }
230}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define PROGRESS_REPORT_MILLISECOND_THRESHOLD
#define PROGRESS_REPORT_BYTE_INTERVAL
uint64_t uint64
Definition c.h:547
int64 TimestampTz
Definition timestamp.h:39
#define pq_flush_if_writable()
Definition libpq.h:47
#define pq_putmessage(msgtype, s, len)
Definition libpq.h:49
const void size_t len
static char buf[DEFAULT_XLOG_SEG_SIZE]
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
static int fb(int x)
#define PqMsg_CopyData
Definition protocol.h:65
#define PqBackupMsg_ProgressReport
Definition protocol.h:91

References buf, fb(), GetCurrentTimestamp(), len, now(), pq_beginmessage(), pq_endmessage(), pq_flush_if_writable, pq_putmessage, pq_sendbyte(), pq_sendint64(), PqBackupMsg_ProgressReport, PqMsg_CopyData, PROGRESS_REPORT_BYTE_INTERVAL, PROGRESS_REPORT_MILLISECOND_THRESHOLD, and TimestampDifferenceMilliseconds().

◆ bbsink_copystream_begin_archive()

static void bbsink_copystream_begin_archive ( bbsink sink,
const char archive_name 
)
static

Definition at line 165 of file basebackup_copy.c.

166{
167 bbsink_state *state = sink->bbs_state;
170
171 ti = list_nth(state->tablespaces, state->tablespace_num);
174 pq_sendstring(&buf, archive_name);
175 pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
177}
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
void pq_sendstring(StringInfo buf, const char *str)
Definition pqformat.c:195
#define PqBackupMsg_NewArchive
Definition protocol.h:90

References buf, fb(), list_nth(), pq_beginmessage(), pq_endmessage(), pq_sendbyte(), pq_sendstring(), PqBackupMsg_NewArchive, and PqMsg_CopyData.

◆ bbsink_copystream_begin_backup()

static void bbsink_copystream_begin_backup ( bbsink sink)
static

Definition at line 126 of file basebackup_copy.c.

127{
129 bbsink_state *state = sink->bbs_state;
130 char *buf;
131
132 /*
133 * Initialize buffer. We ultimately want to send the archive and manifest
134 * data by means of CopyData messages where the payload portion of each
135 * message begins with a type byte. However, basebackup.c expects the
136 * buffer to be aligned, so we can't just allocate one extra byte for the
137 * type byte. Instead, allocate enough extra bytes that the portion of the
138 * buffer we reveal to our callers can be aligned, while leaving room to
139 * slip the type byte in just beforehand. That will allow us to ship the
140 * data with a single call to pq_putmessage and without needing any extra
141 * copying.
142 */
143 buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144 mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145 mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
146 mysink->msgbuffer[0] = PqMsg_CopyData; /* archive or manifest data */
147
148 /* Tell client the backup start location. */
149 SendXlogRecPtrResult(state->startptr, state->starttli);
150
151 /* Send client a list of tablespaces. */
152 SendTablespaceList(state->tablespaces);
153
154 /* Send a CommandComplete message */
156
157 /* Begin COPY stream. This will be used for all archives + manifest. */
159}
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void SendTablespaceList(List *tablespaces)
static void SendCopyOutResponse(void)
void * palloc(Size size)
Definition mcxt.c:1387
void pq_puttextmessage(char msgtype, const char *str)
Definition pqformat.c:366
#define PqMsg_CommandComplete
Definition protocol.h:42

References buf, fb(), palloc(), pq_puttextmessage(), PqMsg_CommandComplete, PqMsg_CopyData, SendCopyOutResponse(), SendTablespaceList(), and SendXlogRecPtrResult().

◆ bbsink_copystream_begin_manifest()

static void bbsink_copystream_begin_manifest ( bbsink sink)
static

◆ bbsink_copystream_cleanup()

static void bbsink_copystream_cleanup ( bbsink sink)
static

Definition at line 308 of file basebackup_copy.c.

309{
310 /* Nothing to do. */
311}

◆ bbsink_copystream_end_archive()

static void bbsink_copystream_end_archive ( bbsink sink)
static

Definition at line 241 of file basebackup_copy.c.

242{
244 bbsink_state *state = mysink->base.bbs_state;
246
247 mysink->bytes_done_at_last_time_check = state->bytes_done;
248 mysink->last_progress_report_time = GetCurrentTimestamp();
251 pq_sendint64(&buf, state->bytes_done);
254}

References buf, fb(), GetCurrentTimestamp(), pq_beginmessage(), pq_endmessage(), pq_flush_if_writable, pq_sendbyte(), pq_sendint64(), PqBackupMsg_ProgressReport, and PqMsg_CopyData.

◆ bbsink_copystream_end_backup()

static void bbsink_copystream_end_backup ( bbsink sink,
XLogRecPtr  endptr,
TimeLineID  endtli 
)
static

Definition at line 297 of file basebackup_copy.c.

299{
300 SendCopyDone();
302}
static void SendCopyDone(void)

References fb(), SendCopyDone(), and SendXlogRecPtrResult().

◆ bbsink_copystream_end_manifest()

static void bbsink_copystream_end_manifest ( bbsink sink)
static

Definition at line 288 of file basebackup_copy.c.

289{
290 /* Do nothing. */
291}

◆ bbsink_copystream_manifest_contents()

static void bbsink_copystream_manifest_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 273 of file basebackup_copy.c.

274{
276
277 if (mysink->send_to_client)
278 {
279 /* Add one because we're also sending a leading type byte. */
280 pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
281 }
282}

References fb(), len, pq_putmessage, and PqMsg_CopyData.

◆ bbsink_copystream_new()

bbsink * bbsink_copystream_new ( bool  send_to_client)

Definition at line 108 of file basebackup_copy.c.

109{
111
112 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
113 sink->send_to_client = send_to_client;
114
115 /* Set up for periodic progress reporting. */
116 sink->last_progress_report_time = GetCurrentTimestamp();
117 sink->bytes_done_at_last_time_check = UINT64CONST(0);
118
119 return &sink->base;
120}
static const bbsink_ops bbsink_copystream_ops
#define UINT64CONST(x)
Definition c.h:561
#define palloc0_object(type)
Definition fe_memutils.h:75

References bbsink_copystream_ops, fb(), GetCurrentTimestamp(), palloc0_object, and UINT64CONST.

Referenced by SendBaseBackup().

◆ SendCopyDone()

static void SendCopyDone ( void  )
static

Definition at line 331 of file basebackup_copy.c.

332{
334}
void pq_putemptymessage(char msgtype)
Definition pqformat.c:387
#define PqMsg_CopyDone
Definition protocol.h:64

References pq_putemptymessage(), and PqMsg_CopyDone.

Referenced by bbsink_copystream_end_backup().

◆ SendCopyOutResponse()

static void SendCopyOutResponse ( void  )
static

Definition at line 317 of file basebackup_copy.c.

318{
320
322 pq_sendbyte(&buf, 0); /* overall format */
323 pq_sendint16(&buf, 0); /* natts */
325}
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
#define PqMsg_CopyOutResponse
Definition protocol.h:46

References buf, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), pq_sendint16(), and PqMsg_CopyOutResponse.

Referenced by bbsink_copystream_begin_backup().

◆ SendTablespaceList()

static void SendTablespaceList ( List tablespaces)
static

Definition at line 378 of file basebackup_copy.c.

379{
382 TupleDesc tupdesc;
383 ListCell *lc;
384
386
387 tupdesc = CreateTemplateTupleDesc(3);
388 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
389 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
390 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
391
392 /* send RowDescription */
394
395 /* Construct and send the directory information */
396 foreach(lc, tablespaces)
397 {
399 Datum values[3];
400 bool nulls[3] = {0};
401
402 /* Send one datarow message */
403 if (ti->path == NULL)
404 {
405 nulls[0] = true;
406 nulls[1] = true;
407 }
408 else
409 {
410 values[0] = ObjectIdGetDatum(ti->oid);
411 values[1] = CStringGetTextDatum(ti->path);
412 }
413 if (ti->size >= 0)
414 values[2] = Int64GetDatum(ti->size / 1024);
415 else
416 nulls[2] = true;
417
418 do_tup_output(tstate, values, nulls);
419 }
420
422}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define lfirst(lc)
Definition pg_list.h:172
static Datum Int64GetDatum(int64 X)
Definition postgres.h:423
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:182
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:918

References begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), Int64GetDatum(), lfirst, ObjectIdGetDatum(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), and values.

Referenced by bbsink_copystream_begin_backup().

◆ SendXlogRecPtrResult()

static void SendXlogRecPtrResult ( XLogRecPtr  ptr,
TimeLineID  tli 
)
static

Definition at line 341 of file basebackup_copy.c.

342{
345 TupleDesc tupdesc;
346 Datum values[2];
347 bool nulls[2] = {0};
348
350
351 tupdesc = CreateTemplateTupleDesc(2);
352 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
353
354 /*
355 * int8 may seem like a surprising data type for this, but in theory int4
356 * would not be wide enough for this, as TimeLineID is unsigned.
357 */
358 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
359
360 /* send RowDescription */
362
363 /* Data row */
364 values[0] = CStringGetTextDatum(psprintf("%X/%08X", LSN_FORMAT_ARGS(ptr)));
365 values[1] = Int64GetDatum(tli);
366 do_tup_output(tstate, values, nulls);
367
369
370 /* Send a CommandComplete message */
372}
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47

References begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), Int64GetDatum(), LSN_FORMAT_ARGS, pq_puttextmessage(), PqMsg_CommandComplete, psprintf(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), and values.

Referenced by bbsink_copystream_begin_backup(), and bbsink_copystream_end_backup().

Variable Documentation

◆ bbsink_copystream_ops

const bbsink_ops bbsink_copystream_ops
static
Initial value:
= {
}
static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
static void bbsink_copystream_cleanup(bbsink *sink)
static void bbsink_copystream_archive_contents(bbsink *sink, size_t len)
static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
static void bbsink_copystream_end_archive(bbsink *sink)
static void bbsink_copystream_begin_manifest(bbsink *sink)
static void bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_copystream_begin_backup(bbsink *sink)
static void bbsink_copystream_end_manifest(bbsink *sink)

Definition at line 92 of file basebackup_copy.c.

92 {
93 .begin_backup = bbsink_copystream_begin_backup,
94 .begin_archive = bbsink_copystream_begin_archive,
95 .archive_contents = bbsink_copystream_archive_contents,
96 .end_archive = bbsink_copystream_end_archive,
97 .begin_manifest = bbsink_copystream_begin_manifest,
98 .manifest_contents = bbsink_copystream_manifest_contents,
99 .end_manifest = bbsink_copystream_end_manifest,
100 .end_backup = bbsink_copystream_end_backup,
102};

Referenced by bbsink_copystream_new().