PostgreSQL Source Code  git master
receivelog.h File Reference
#include "access/xlogdefs.h"
#include "libpq-fe.h"
#include "walmethods.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

◆ stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 23 of file receivelog.h.

◆ StreamCtl

typedef struct StreamCtl StreamCtl

Function Documentation

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 381 of file receivelog.c.

382 {
383  int minServerMajor,
384  maxServerMajor;
385  int serverMajor;
386 
387  /*
388  * The message format used in streaming replication changed in 9.3, so we
389  * cannot stream from older servers. And we don't support servers newer
390  * than the client; it might work, but we don't know, so err on the safe
391  * side.
392  */
393  minServerMajor = 903;
394  maxServerMajor = PG_VERSION_NUM / 100;
395  serverMajor = PQserverVersion(conn) / 100;
396  if (serverMajor < minServerMajor)
397  {
398  const char *serverver = PQparameterStatus(conn, "server_version");
399 
400  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
401  serverver ? serverver : "'unknown'",
402  "9.3");
403  return false;
404  }
405  else if (serverMajor > maxServerMajor)
406  {
407  const char *serverver = PQparameterStatus(conn, "server_version");
408 
409  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
410  serverver ? serverver : "'unknown'",
411  PG_VERSION);
412  return false;
413  }
414  return true;
415 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6735
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6760
#define pg_log_error(...)
Definition: logging.h:80
PGconn * conn
Definition: streamutil.c:54

References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 459 of file receivelog.c.

460 {
461  char query[128];
462  char slotcmd[128];
463  PGresult *res;
464  XLogRecPtr stoppos;
465 
466  /*
467  * The caller should've checked the server version already, but doesn't do
468  * any harm to check it here too.
469  */
471  return false;
472 
473  /*
474  * Decide whether we want to report the flush position. If we report the
475  * flush position, the primary will know what WAL we'll possibly
476  * re-request, and it can then remove older WAL safely. We must always do
477  * that when we are using slots.
478  *
479  * Reporting the flush position makes one eligible as a synchronous
480  * replica. People shouldn't include generic names in
481  * synchronous_standby_names, but we've protected them against it so far,
482  * so let's continue to do so unless specifically requested.
483  */
484  if (stream->replication_slot != NULL)
485  {
486  reportFlushPosition = true;
487  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
488  }
489  else
490  {
491  if (stream->synchronous)
492  reportFlushPosition = true;
493  else
494  reportFlushPosition = false;
495  slotcmd[0] = 0;
496  }
497 
498  if (stream->sysidentifier != NULL)
499  {
500  char *sysidentifier = NULL;
501  TimeLineID servertli;
502 
503  /*
504  * Get the server system identifier and timeline, and validate them.
505  */
506  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
507  {
508  pg_free(sysidentifier);
509  return false;
510  }
511 
512  if (strcmp(stream->sysidentifier, sysidentifier) != 0)
513  {
514  pg_log_error("system identifier does not match between base backup and streaming connection");
515  pg_free(sysidentifier);
516  return false;
517  }
518  pg_free(sysidentifier);
519 
520  if (stream->timeline > servertli)
521  {
522  pg_log_error("starting timeline %u is not present in the server",
523  stream->timeline);
524  return false;
525  }
526  }
527 
528  /*
529  * initialize flush position to starting point, it's the caller's
530  * responsibility that that's sane.
531  */
532  lastFlushPosition = stream->startpos;
533 
534  while (1)
535  {
536  /*
537  * Fetch the timeline history file for this timeline, if we don't have
538  * it already. When streaming log to tar, this will always return
539  * false, as we are never streaming into an existing file and
540  * therefore there can be no pre-existing timeline history file.
541  */
542  if (!existsTimeLineHistoryFile(stream))
543  {
544  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
545  res = PQexec(conn, query);
547  {
548  /* FIXME: we might send it ok, but get an error */
549  pg_log_error("could not send replication command \"%s\": %s",
550  "TIMELINE_HISTORY", PQresultErrorMessage(res));
551  PQclear(res);
552  return false;
553  }
554 
555  /*
556  * The response to TIMELINE_HISTORY is a single row result set
557  * with two fields: filename and content
558  */
559  if (PQnfields(res) != 2 || PQntuples(res) != 1)
560  {
561  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
562  PQntuples(res), PQnfields(res), 1, 2);
563  }
564 
565  /* Write the history file to disk */
567  PQgetvalue(res, 0, 0),
568  PQgetvalue(res, 0, 1));
569 
570  PQclear(res);
571  }
572 
573  /*
574  * Before we start streaming from the requested location, check if the
575  * callback tells us to stop here.
576  */
577  if (stream->stream_stop(stream->startpos, stream->timeline, false))
578  return true;
579 
580  /* Initiate the replication stream at specified location */
581  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
582  slotcmd,
583  LSN_FORMAT_ARGS(stream->startpos),
584  stream->timeline);
585  res = PQexec(conn, query);
587  {
588  pg_log_error("could not send replication command \"%s\": %s",
589  "START_REPLICATION", PQresultErrorMessage(res));
590  PQclear(res);
591  return false;
592  }
593  PQclear(res);
594 
595  /* Stream the WAL */
596  res = HandleCopyStream(conn, stream, &stoppos);
597  if (res == NULL)
598  goto error;
599 
600  /*
601  * Streaming finished.
602  *
603  * There are two possible reasons for that: a controlled shutdown, or
604  * we reached the end of the current timeline. In case of
605  * end-of-timeline, the server sends a result set after Copy has
606  * finished, containing information about the next timeline. Read
607  * that, and restart streaming from the next timeline. In case of
608  * controlled shutdown, stop here.
609  */
611  {
612  /*
613  * End-of-timeline. Read the next timeline's ID and starting
614  * position. Usually, the starting position will match the end of
615  * the previous timeline, but there are corner cases like if the
616  * server had sent us half of a WAL record, when it was promoted.
617  * The new timeline will begin at the end of the last complete
618  * record in that case, overlapping the partial WAL record on the
619  * old timeline.
620  */
621  uint32 newtimeline;
622  bool parsed;
623 
624  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
625  PQclear(res);
626  if (!parsed)
627  goto error;
628 
629  /* Sanity check the values the server gave us */
630  if (newtimeline <= stream->timeline)
631  {
632  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
633  newtimeline, stream->timeline);
634  goto error;
635  }
636  if (stream->startpos > stoppos)
637  {
638  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
639  stream->timeline, LSN_FORMAT_ARGS(stoppos),
640  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
641  goto error;
642  }
643 
644  /* Read the final result, which should be CommandComplete. */
645  res = PQgetResult(conn);
647  {
648  pg_log_error("unexpected termination of replication stream: %s",
650  PQclear(res);
651  goto error;
652  }
653  PQclear(res);
654 
655  /*
656  * Loop back to start streaming from the new timeline. Always
657  * start streaming at the beginning of a segment.
658  */
659  stream->timeline = newtimeline;
660  stream->startpos = stream->startpos -
662  continue;
663  }
664  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
665  {
666  PQclear(res);
667 
668  /*
669  * End of replication (ie. controlled shut down of the server).
670  *
671  * Check if the callback thinks it's OK to stop here. If not,
672  * complain.
673  */
674  if (stream->stream_stop(stoppos, stream->timeline, false))
675  return true;
676  else
677  {
678  pg_log_error("replication stream was terminated before stop point");
679  goto error;
680  }
681  }
682  else
683  {
684  /* Server returned an error. */
685  pg_log_error("unexpected termination of replication stream: %s",
687  PQclear(res);
688  goto error;
689  }
690  }
691 
692 error:
693  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
694  pg_log_error("could not close file \"%s\": %s",
696  walfile = NULL;
697  return false;
698 }
unsigned int uint32
Definition: c.h:441
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void PQclear(PGresult *res)
Definition: fe-exec.c:694
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3194
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3256
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:107
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:95
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:98
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:224
#define snprintf
Definition: port.h:222
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:751
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
static bool reportFlushPosition
Definition: receivelog.c:33
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:265
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:282
static Walfile * walfile
Definition: receivelog.c:31
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:381
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:705
static void error(void)
Definition: sql-dyntest.c:147
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:409
uint32 WalSegSz
Definition: streamutil.c:34
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool synchronous
Definition: receivelog.h:36
const char *(* getlasterror)(void)
Definition: walmethods.h:95
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:55
@ CLOSE_NO_RENAME
Definition: walmethods.h:19
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59

References CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, conn, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, res, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().