PostgreSQL Source Code  git master
receivelog.h File Reference
#include "access/xlogdefs.h"
#include "libpq-fe.h"
#include "walmethods.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

◆ stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 23 of file receivelog.h.

◆ StreamCtl

typedef struct StreamCtl StreamCtl

Function Documentation

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 358 of file receivelog.c.

References pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

359 {
360  int minServerMajor,
361  maxServerMajor;
362  int serverMajor;
363 
364  /*
365  * The message format used in streaming replication changed in 9.3, so we
366  * cannot stream from older servers. And we don't support servers newer
367  * than the client; it might work, but we don't know, so err on the safe
368  * side.
369  */
370  minServerMajor = 903;
371  maxServerMajor = PG_VERSION_NUM / 100;
372  serverMajor = PQserverVersion(conn) / 100;
373  if (serverMajor < minServerMajor)
374  {
375  const char *serverver = PQparameterStatus(conn, "server_version");
376 
377  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
378  serverver ? serverver : "'unknown'",
379  "9.3");
380  return false;
381  }
382  else if (serverMajor > maxServerMajor)
383  {
384  const char *serverver = PQparameterStatus(conn, "server_version");
385 
386  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
387  serverver ? serverver : "'unknown'",
388  PG_VERSION);
389  return false;
390  }
391  return true;
392 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6588
#define pg_log_error(...)
Definition: logging.h:79
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6613

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 436 of file receivelog.c.

References CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

437 {
438  char query[128];
439  char slotcmd[128];
440  PGresult *res;
441  XLogRecPtr stoppos;
442 
443  /*
444  * The caller should've checked the server version already, but doesn't do
445  * any harm to check it here too.
446  */
448  return false;
449 
450  /*
451  * Decide whether we want to report the flush position. If we report the
452  * flush position, the primary will know what WAL we'll possibly
453  * re-request, and it can then remove older WAL safely. We must always do
454  * that when we are using slots.
455  *
456  * Reporting the flush position makes one eligible as a synchronous
457  * replica. People shouldn't include generic names in
458  * synchronous_standby_names, but we've protected them against it so far,
459  * so let's continue to do so unless specifically requested.
460  */
461  if (stream->replication_slot != NULL)
462  {
463  reportFlushPosition = true;
464  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
465  }
466  else
467  {
468  if (stream->synchronous)
469  reportFlushPosition = true;
470  else
471  reportFlushPosition = false;
472  slotcmd[0] = 0;
473  }
474 
475  if (stream->sysidentifier != NULL)
476  {
477  /* Validate system identifier hasn't changed */
478  res = PQexec(conn, "IDENTIFY_SYSTEM");
479  if (PQresultStatus(res) != PGRES_TUPLES_OK)
480  {
481  pg_log_error("could not send replication command \"%s\": %s",
482  "IDENTIFY_SYSTEM", PQerrorMessage(conn));
483  PQclear(res);
484  return false;
485  }
486  if (PQntuples(res) != 1 || PQnfields(res) < 3)
487  {
488  pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
489  PQntuples(res), PQnfields(res), 1, 3);
490  PQclear(res);
491  return false;
492  }
493  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
494  {
495  pg_log_error("system identifier does not match between base backup and streaming connection");
496  PQclear(res);
497  return false;
498  }
499  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
500  {
501  pg_log_error("starting timeline %u is not present in the server",
502  stream->timeline);
503  PQclear(res);
504  return false;
505  }
506  PQclear(res);
507  }
508 
509  /*
510  * initialize flush position to starting point, it's the caller's
511  * responsibility that that's sane.
512  */
513  lastFlushPosition = stream->startpos;
514 
515  while (1)
516  {
517  /*
518  * Fetch the timeline history file for this timeline, if we don't have
519  * it already. When streaming log to tar, this will always return
520  * false, as we are never streaming into an existing file and
521  * therefore there can be no pre-existing timeline history file.
522  */
523  if (!existsTimeLineHistoryFile(stream))
524  {
525  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
526  res = PQexec(conn, query);
527  if (PQresultStatus(res) != PGRES_TUPLES_OK)
528  {
529  /* FIXME: we might send it ok, but get an error */
530  pg_log_error("could not send replication command \"%s\": %s",
531  "TIMELINE_HISTORY", PQresultErrorMessage(res));
532  PQclear(res);
533  return false;
534  }
535 
536  /*
537  * The response to TIMELINE_HISTORY is a single row result set
538  * with two fields: filename and content
539  */
540  if (PQnfields(res) != 2 || PQntuples(res) != 1)
541  {
542  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
543  PQntuples(res), PQnfields(res), 1, 2);
544  }
545 
546  /* Write the history file to disk */
548  PQgetvalue(res, 0, 0),
549  PQgetvalue(res, 0, 1));
550 
551  PQclear(res);
552  }
553 
554  /*
555  * Before we start streaming from the requested location, check if the
556  * callback tells us to stop here.
557  */
558  if (stream->stream_stop(stream->startpos, stream->timeline, false))
559  return true;
560 
561  /* Initiate the replication stream at specified location */
562  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
563  slotcmd,
564  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
565  stream->timeline);
566  res = PQexec(conn, query);
567  if (PQresultStatus(res) != PGRES_COPY_BOTH)
568  {
569  pg_log_error("could not send replication command \"%s\": %s",
570  "START_REPLICATION", PQresultErrorMessage(res));
571  PQclear(res);
572  return false;
573  }
574  PQclear(res);
575 
576  /* Stream the WAL */
577  res = HandleCopyStream(conn, stream, &stoppos);
578  if (res == NULL)
579  goto error;
580 
581  /*
582  * Streaming finished.
583  *
584  * There are two possible reasons for that: a controlled shutdown, or
585  * we reached the end of the current timeline. In case of
586  * end-of-timeline, the server sends a result set after Copy has
587  * finished, containing information about the next timeline. Read
588  * that, and restart streaming from the next timeline. In case of
589  * controlled shutdown, stop here.
590  */
591  if (PQresultStatus(res) == PGRES_TUPLES_OK)
592  {
593  /*
594  * End-of-timeline. Read the next timeline's ID and starting
595  * position. Usually, the starting position will match the end of
596  * the previous timeline, but there are corner cases like if the
597  * server had sent us half of a WAL record, when it was promoted.
598  * The new timeline will begin at the end of the last complete
599  * record in that case, overlapping the partial WAL record on the
600  * old timeline.
601  */
602  uint32 newtimeline;
603  bool parsed;
604 
605  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
606  PQclear(res);
607  if (!parsed)
608  goto error;
609 
610  /* Sanity check the values the server gave us */
611  if (newtimeline <= stream->timeline)
612  {
613  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
614  newtimeline, stream->timeline);
615  goto error;
616  }
617  if (stream->startpos > stoppos)
618  {
619  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
620  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
621  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
622  goto error;
623  }
624 
625  /* Read the final result, which should be CommandComplete. */
626  res = PQgetResult(conn);
627  if (PQresultStatus(res) != PGRES_COMMAND_OK)
628  {
629  pg_log_error("unexpected termination of replication stream: %s",
630  PQresultErrorMessage(res));
631  PQclear(res);
632  goto error;
633  }
634  PQclear(res);
635 
636  /*
637  * Loop back to start streaming from the new timeline. Always
638  * start streaming at the beginning of a segment.
639  */
640  stream->timeline = newtimeline;
641  stream->startpos = stream->startpos -
643  continue;
644  }
645  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
646  {
647  PQclear(res);
648 
649  /*
650  * End of replication (ie. controlled shut down of the server).
651  *
652  * Check if the callback thinks it's OK to stop here. If not,
653  * complain.
654  */
655  if (stream->stream_stop(stoppos, stream->timeline, false))
656  return true;
657  else
658  {
659  pg_log_error("replication stream was terminated before stop point");
660  goto error;
661  }
662  }
663  else
664  {
665  /* Server returned an error. */
666  pg_log_error("unexpected termination of replication stream: %s",
667  PQresultErrorMessage(res));
668  PQclear(res);
669  goto error;
670  }
671  }
672 
673 error:
674  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
675  pg_log_error("could not close file \"%s\": %s",
677  walfile = NULL;
678  return false;
679 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2777
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3163
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
#define pg_log_error(...)
Definition: logging.h:79
char * sysidentifier
Definition: receivelog.h:33
XLogRecPtr startpos
Definition: receivelog.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2769
TimeLineID timeline
Definition: receivelog.h:32
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
const char *(* getlasterror)(void)
Definition: walmethods.h:78
#define sprintf
Definition: port.h:194
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:686
static Walfile * walfile
Definition: receivelog.c:31
char * replication_slot
Definition: receivelog.h:48
static bool reportFlushPosition
Definition: receivelog.c:33
unsigned int uint32
Definition: c.h:359
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
void PQclear(PGresult *res)
Definition: fe-exec.c:694
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:732
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:242
bool synchronous
Definition: receivelog.h:36
int WalSegSz
Definition: pg_standby.c:38
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2708
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:192
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:259
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:358