PostgreSQL Source Code  git master
receivelog.h File Reference
#include "access/xlogdefs.h"
#include "libpq-fe.h"
#include "walmethods.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

◆ stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 23 of file receivelog.h.

◆ StreamCtl

typedef struct StreamCtl StreamCtl

Function Documentation

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 376 of file receivelog.c.

References pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

377 {
378  int minServerMajor,
379  maxServerMajor;
380  int serverMajor;
381 
382  /*
383  * The message format used in streaming replication changed in 9.3, so we
384  * cannot stream from older servers. And we don't support servers newer
385  * than the client; it might work, but we don't know, so err on the safe
386  * side.
387  */
388  minServerMajor = 903;
389  maxServerMajor = PG_VERSION_NUM / 100;
390  serverMajor = PQserverVersion(conn) / 100;
391  if (serverMajor < minServerMajor)
392  {
393  const char *serverver = PQparameterStatus(conn, "server_version");
394 
395  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
396  serverver ? serverver : "'unknown'",
397  "9.3");
398  return false;
399  }
400  else if (serverMajor > maxServerMajor)
401  {
402  const char *serverver = PQparameterStatus(conn, "server_version");
403 
404  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
405  serverver ? serverver : "'unknown'",
406  PG_VERSION);
407  return false;
408  }
409  return true;
410 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6709
#define pg_log_error(...)
Definition: logging.h:80
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6734

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 454 of file receivelog.c.

References CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

455 {
456  char query[128];
457  char slotcmd[128];
458  PGresult *res;
459  XLogRecPtr stoppos;
460 
461  /*
462  * The caller should've checked the server version already, but doesn't do
463  * any harm to check it here too.
464  */
466  return false;
467 
468  /*
469  * Decide whether we want to report the flush position. If we report the
470  * flush position, the primary will know what WAL we'll possibly
471  * re-request, and it can then remove older WAL safely. We must always do
472  * that when we are using slots.
473  *
474  * Reporting the flush position makes one eligible as a synchronous
475  * replica. People shouldn't include generic names in
476  * synchronous_standby_names, but we've protected them against it so far,
477  * so let's continue to do so unless specifically requested.
478  */
479  if (stream->replication_slot != NULL)
480  {
481  reportFlushPosition = true;
482  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
483  }
484  else
485  {
486  if (stream->synchronous)
487  reportFlushPosition = true;
488  else
489  reportFlushPosition = false;
490  slotcmd[0] = 0;
491  }
492 
493  if (stream->sysidentifier != NULL)
494  {
495  char *sysidentifier = NULL;
496  TimeLineID servertli;
497 
498  /*
499  * Get the server system identifier and timeline, and validate them.
500  */
501  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
502  {
503  pg_free(sysidentifier);
504  return false;
505  }
506 
507  if (strcmp(stream->sysidentifier, sysidentifier) != 0)
508  {
509  pg_log_error("system identifier does not match between base backup and streaming connection");
510  pg_free(sysidentifier);
511  return false;
512  }
513  pg_free(sysidentifier);
514 
515  if (stream->timeline > servertli)
516  {
517  pg_log_error("starting timeline %u is not present in the server",
518  stream->timeline);
519  return false;
520  }
521  }
522 
523  /*
524  * initialize flush position to starting point, it's the caller's
525  * responsibility that that's sane.
526  */
527  lastFlushPosition = stream->startpos;
528 
529  while (1)
530  {
531  /*
532  * Fetch the timeline history file for this timeline, if we don't have
533  * it already. When streaming log to tar, this will always return
534  * false, as we are never streaming into an existing file and
535  * therefore there can be no pre-existing timeline history file.
536  */
537  if (!existsTimeLineHistoryFile(stream))
538  {
539  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
540  res = PQexec(conn, query);
541  if (PQresultStatus(res) != PGRES_TUPLES_OK)
542  {
543  /* FIXME: we might send it ok, but get an error */
544  pg_log_error("could not send replication command \"%s\": %s",
545  "TIMELINE_HISTORY", PQresultErrorMessage(res));
546  PQclear(res);
547  return false;
548  }
549 
550  /*
551  * The response to TIMELINE_HISTORY is a single row result set
552  * with two fields: filename and content
553  */
554  if (PQnfields(res) != 2 || PQntuples(res) != 1)
555  {
556  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
557  PQntuples(res), PQnfields(res), 1, 2);
558  }
559 
560  /* Write the history file to disk */
562  PQgetvalue(res, 0, 0),
563  PQgetvalue(res, 0, 1));
564 
565  PQclear(res);
566  }
567 
568  /*
569  * Before we start streaming from the requested location, check if the
570  * callback tells us to stop here.
571  */
572  if (stream->stream_stop(stream->startpos, stream->timeline, false))
573  return true;
574 
575  /* Initiate the replication stream at specified location */
576  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
577  slotcmd,
578  LSN_FORMAT_ARGS(stream->startpos),
579  stream->timeline);
580  res = PQexec(conn, query);
581  if (PQresultStatus(res) != PGRES_COPY_BOTH)
582  {
583  pg_log_error("could not send replication command \"%s\": %s",
584  "START_REPLICATION", PQresultErrorMessage(res));
585  PQclear(res);
586  return false;
587  }
588  PQclear(res);
589 
590  /* Stream the WAL */
591  res = HandleCopyStream(conn, stream, &stoppos);
592  if (res == NULL)
593  goto error;
594 
595  /*
596  * Streaming finished.
597  *
598  * There are two possible reasons for that: a controlled shutdown, or
599  * we reached the end of the current timeline. In case of
600  * end-of-timeline, the server sends a result set after Copy has
601  * finished, containing information about the next timeline. Read
602  * that, and restart streaming from the next timeline. In case of
603  * controlled shutdown, stop here.
604  */
605  if (PQresultStatus(res) == PGRES_TUPLES_OK)
606  {
607  /*
608  * End-of-timeline. Read the next timeline's ID and starting
609  * position. Usually, the starting position will match the end of
610  * the previous timeline, but there are corner cases like if the
611  * server had sent us half of a WAL record, when it was promoted.
612  * The new timeline will begin at the end of the last complete
613  * record in that case, overlapping the partial WAL record on the
614  * old timeline.
615  */
616  uint32 newtimeline;
617  bool parsed;
618 
619  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
620  PQclear(res);
621  if (!parsed)
622  goto error;
623 
624  /* Sanity check the values the server gave us */
625  if (newtimeline <= stream->timeline)
626  {
627  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
628  newtimeline, stream->timeline);
629  goto error;
630  }
631  if (stream->startpos > stoppos)
632  {
633  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
634  stream->timeline, LSN_FORMAT_ARGS(stoppos),
635  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
636  goto error;
637  }
638 
639  /* Read the final result, which should be CommandComplete. */
640  res = PQgetResult(conn);
641  if (PQresultStatus(res) != PGRES_COMMAND_OK)
642  {
643  pg_log_error("unexpected termination of replication stream: %s",
644  PQresultErrorMessage(res));
645  PQclear(res);
646  goto error;
647  }
648  PQclear(res);
649 
650  /*
651  * Loop back to start streaming from the new timeline. Always
652  * start streaming at the beginning of a segment.
653  */
654  stream->timeline = newtimeline;
655  stream->startpos = stream->startpos -
657  continue;
658  }
659  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
660  {
661  PQclear(res);
662 
663  /*
664  * End of replication (ie. controlled shut down of the server).
665  *
666  * Check if the callback thinks it's OK to stop here. If not,
667  * complain.
668  */
669  if (stream->stream_stop(stoppos, stream->timeline, false))
670  return true;
671  else
672  {
673  pg_log_error("replication stream was terminated before stop point");
674  goto error;
675  }
676  }
677  else
678  {
679  /* Server returned an error. */
680  pg_log_error("unexpected termination of replication stream: %s",
681  PQresultErrorMessage(res));
682  PQclear(res);
683  goto error;
684  }
685  }
686 
687 error:
688  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
689  pg_log_error("could not close file \"%s\": %s",
691  walfile = NULL;
692  return false;
693 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3256
uint32 TimeLineID
Definition: xlogdefs.h:59
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
#define pg_log_error(...)
Definition: logging.h:80
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:409
char * sysidentifier
Definition: receivelog.h:33
XLogRecPtr startpos
Definition: receivelog.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
TimeLineID timeline
Definition: receivelog.h:32
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
const char *(* getlasterror)(void)
Definition: walmethods.h:87
#define sprintf
Definition: port.h:218
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:700
static Walfile * walfile
Definition: receivelog.c:31
char * replication_slot
Definition: receivelog.h:48
static bool reportFlushPosition
Definition: receivelog.c:33
unsigned int uint32
Definition: c.h:441
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
void PQclear(PGresult *res)
Definition: fe-exec.c:694
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:746
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:260
bool synchronous
Definition: receivelog.h:36
void pg_free(void *ptr)
Definition: fe_memutils.c:105
uint32 WalSegSz
Definition: streamutil.c:34
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3194
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:216
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:277
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:376