PostgreSQL Source Code  git master
receivelog.h File Reference
#include "access/xlogdefs.h"
#include "libpq-fe.h"
#include "walmethods.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

◆ stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 23 of file receivelog.h.

◆ StreamCtl

typedef struct StreamCtl StreamCtl

Function Documentation

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 357 of file receivelog.c.

References pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

358 {
359  int minServerMajor,
360  maxServerMajor;
361  int serverMajor;
362 
363  /*
364  * The message format used in streaming replication changed in 9.3, so we
365  * cannot stream from older servers. And we don't support servers newer
366  * than the client; it might work, but we don't know, so err on the safe
367  * side.
368  */
369  minServerMajor = 903;
370  maxServerMajor = PG_VERSION_NUM / 100;
371  serverMajor = PQserverVersion(conn) / 100;
372  if (serverMajor < minServerMajor)
373  {
374  const char *serverver = PQparameterStatus(conn, "server_version");
375 
376  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
377  serverver ? serverver : "'unknown'",
378  "9.3");
379  return false;
380  }
381  else if (serverMajor > maxServerMajor)
382  {
383  const char *serverver = PQparameterStatus(conn, "server_version");
384 
385  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
386  serverver ? serverver : "'unknown'",
387  PG_VERSION);
388  return false;
389  }
390  return true;
391 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6700
#define pg_log_error(...)
Definition: logging.h:80
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6725

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 435 of file receivelog.c.

References CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

436 {
437  char query[128];
438  char slotcmd[128];
439  PGresult *res;
440  XLogRecPtr stoppos;
441 
442  /*
443  * The caller should've checked the server version already, but doesn't do
444  * any harm to check it here too.
445  */
447  return false;
448 
449  /*
450  * Decide whether we want to report the flush position. If we report the
451  * flush position, the primary will know what WAL we'll possibly
452  * re-request, and it can then remove older WAL safely. We must always do
453  * that when we are using slots.
454  *
455  * Reporting the flush position makes one eligible as a synchronous
456  * replica. People shouldn't include generic names in
457  * synchronous_standby_names, but we've protected them against it so far,
458  * so let's continue to do so unless specifically requested.
459  */
460  if (stream->replication_slot != NULL)
461  {
462  reportFlushPosition = true;
463  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
464  }
465  else
466  {
467  if (stream->synchronous)
468  reportFlushPosition = true;
469  else
470  reportFlushPosition = false;
471  slotcmd[0] = 0;
472  }
473 
474  if (stream->sysidentifier != NULL)
475  {
476  /* Validate system identifier hasn't changed */
477  res = PQexec(conn, "IDENTIFY_SYSTEM");
478  if (PQresultStatus(res) != PGRES_TUPLES_OK)
479  {
480  pg_log_error("could not send replication command \"%s\": %s",
481  "IDENTIFY_SYSTEM", PQerrorMessage(conn));
482  PQclear(res);
483  return false;
484  }
485  if (PQntuples(res) != 1 || PQnfields(res) < 3)
486  {
487  pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
488  PQntuples(res), PQnfields(res), 1, 3);
489  PQclear(res);
490  return false;
491  }
492  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
493  {
494  pg_log_error("system identifier does not match between base backup and streaming connection");
495  PQclear(res);
496  return false;
497  }
498  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
499  {
500  pg_log_error("starting timeline %u is not present in the server",
501  stream->timeline);
502  PQclear(res);
503  return false;
504  }
505  PQclear(res);
506  }
507 
508  /*
509  * initialize flush position to starting point, it's the caller's
510  * responsibility that that's sane.
511  */
512  lastFlushPosition = stream->startpos;
513 
514  while (1)
515  {
516  /*
517  * Fetch the timeline history file for this timeline, if we don't have
518  * it already. When streaming log to tar, this will always return
519  * false, as we are never streaming into an existing file and
520  * therefore there can be no pre-existing timeline history file.
521  */
522  if (!existsTimeLineHistoryFile(stream))
523  {
524  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
525  res = PQexec(conn, query);
526  if (PQresultStatus(res) != PGRES_TUPLES_OK)
527  {
528  /* FIXME: we might send it ok, but get an error */
529  pg_log_error("could not send replication command \"%s\": %s",
530  "TIMELINE_HISTORY", PQresultErrorMessage(res));
531  PQclear(res);
532  return false;
533  }
534 
535  /*
536  * The response to TIMELINE_HISTORY is a single row result set
537  * with two fields: filename and content
538  */
539  if (PQnfields(res) != 2 || PQntuples(res) != 1)
540  {
541  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
542  PQntuples(res), PQnfields(res), 1, 2);
543  }
544 
545  /* Write the history file to disk */
547  PQgetvalue(res, 0, 0),
548  PQgetvalue(res, 0, 1));
549 
550  PQclear(res);
551  }
552 
553  /*
554  * Before we start streaming from the requested location, check if the
555  * callback tells us to stop here.
556  */
557  if (stream->stream_stop(stream->startpos, stream->timeline, false))
558  return true;
559 
560  /* Initiate the replication stream at specified location */
561  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
562  slotcmd,
563  LSN_FORMAT_ARGS(stream->startpos),
564  stream->timeline);
565  res = PQexec(conn, query);
566  if (PQresultStatus(res) != PGRES_COPY_BOTH)
567  {
568  pg_log_error("could not send replication command \"%s\": %s",
569  "START_REPLICATION", PQresultErrorMessage(res));
570  PQclear(res);
571  return false;
572  }
573  PQclear(res);
574 
575  /* Stream the WAL */
576  res = HandleCopyStream(conn, stream, &stoppos);
577  if (res == NULL)
578  goto error;
579 
580  /*
581  * Streaming finished.
582  *
583  * There are two possible reasons for that: a controlled shutdown, or
584  * we reached the end of the current timeline. In case of
585  * end-of-timeline, the server sends a result set after Copy has
586  * finished, containing information about the next timeline. Read
587  * that, and restart streaming from the next timeline. In case of
588  * controlled shutdown, stop here.
589  */
590  if (PQresultStatus(res) == PGRES_TUPLES_OK)
591  {
592  /*
593  * End-of-timeline. Read the next timeline's ID and starting
594  * position. Usually, the starting position will match the end of
595  * the previous timeline, but there are corner cases like if the
596  * server had sent us half of a WAL record, when it was promoted.
597  * The new timeline will begin at the end of the last complete
598  * record in that case, overlapping the partial WAL record on the
599  * old timeline.
600  */
601  uint32 newtimeline;
602  bool parsed;
603 
604  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
605  PQclear(res);
606  if (!parsed)
607  goto error;
608 
609  /* Sanity check the values the server gave us */
610  if (newtimeline <= stream->timeline)
611  {
612  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
613  newtimeline, stream->timeline);
614  goto error;
615  }
616  if (stream->startpos > stoppos)
617  {
618  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
619  stream->timeline, LSN_FORMAT_ARGS(stoppos),
620  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
621  goto error;
622  }
623 
624  /* Read the final result, which should be CommandComplete. */
625  res = PQgetResult(conn);
626  if (PQresultStatus(res) != PGRES_COMMAND_OK)
627  {
628  pg_log_error("unexpected termination of replication stream: %s",
629  PQresultErrorMessage(res));
630  PQclear(res);
631  goto error;
632  }
633  PQclear(res);
634 
635  /*
636  * Loop back to start streaming from the new timeline. Always
637  * start streaming at the beginning of a segment.
638  */
639  stream->timeline = newtimeline;
640  stream->startpos = stream->startpos -
642  continue;
643  }
644  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
645  {
646  PQclear(res);
647 
648  /*
649  * End of replication (ie. controlled shut down of the server).
650  *
651  * Check if the callback thinks it's OK to stop here. If not,
652  * complain.
653  */
654  if (stream->stream_stop(stoppos, stream->timeline, false))
655  return true;
656  else
657  {
658  pg_log_error("replication stream was terminated before stop point");
659  goto error;
660  }
661  }
662  else
663  {
664  /* Server returned an error. */
665  pg_log_error("unexpected termination of replication stream: %s",
666  PQresultErrorMessage(res));
667  PQclear(res);
668  goto error;
669  }
670  }
671 
672 error:
673  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
674  pg_log_error("could not close file \"%s\": %s",
676  walfile = NULL;
677  return false;
678 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3175
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6735
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3561
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
#define pg_log_error(...)
Definition: logging.h:80
char * sysidentifier
Definition: receivelog.h:33
XLogRecPtr startpos
Definition: receivelog.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3167
TimeLineID timeline
Definition: receivelog.h:32
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
const char *(* getlasterror)(void)
Definition: walmethods.h:78
#define sprintf
Definition: port.h:218
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:685
static Walfile * walfile
Definition: receivelog.c:31
char * replication_slot
Definition: receivelog.h:48
static bool reportFlushPosition
Definition: receivelog.c:33
unsigned int uint32
Definition: c.h:441
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
void PQclear(PGresult *res)
Definition: fe-exec.c:680
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:731
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:241
bool synchronous
Definition: receivelog.h:36
uint32 WalSegSz
Definition: streamutil.c:34
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3113
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2142
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:216
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:258
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:357