PostgreSQL Source Code  git master
receivelog.h File Reference
#include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

◆ stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 24 of file receivelog.h.

◆ StreamCtl

Function Documentation

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 367 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

368 {
369  int minServerMajor,
370  maxServerMajor;
371  int serverMajor;
372 
373  /*
374  * The message format used in streaming replication changed in 9.3, so we
375  * cannot stream from older servers. And we don't support servers newer
376  * than the client; it might work, but we don't know, so err on the safe
377  * side.
378  */
379  minServerMajor = 903;
380  maxServerMajor = PG_VERSION_NUM / 100;
381  serverMajor = PQserverVersion(conn) / 100;
382  if (serverMajor < minServerMajor)
383  {
384  const char *serverver = PQparameterStatus(conn, "server_version");
385 
386  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
387  progname,
388  serverver ? serverver : "'unknown'",
389  "9.3");
390  return false;
391  }
392  else if (serverMajor > maxServerMajor)
393  {
394  const char *serverver = PQparameterStatus(conn, "server_version");
395 
396  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
397  progname,
398  serverver ? serverver : "'unknown'",
399  PG_VERSION);
400  return false;
401  }
402  return true;
403 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6071
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6096
const char * progname
Definition: pg_standby.c:37
#define _(x)
Definition: elog.c:84

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 447 of file receivelog.c.

References _, CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf(), StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

448 {
449  char query[128];
450  char slotcmd[128];
451  PGresult *res;
452  XLogRecPtr stoppos;
453 
454  /*
455  * The caller should've checked the server version already, but doesn't do
456  * any harm to check it here too.
457  */
459  return false;
460 
461  /*
462  * Decide whether we want to report the flush position. If we report the
463  * flush position, the primary will know what WAL we'll possibly
464  * re-request, and it can then remove older WAL safely. We must always do
465  * that when we are using slots.
466  *
467  * Reporting the flush position makes one eligible as a synchronous
468  * replica. People shouldn't include generic names in
469  * synchronous_standby_names, but we've protected them against it so far,
470  * so let's continue to do so unless specifically requested.
471  */
472  if (stream->replication_slot != NULL)
473  {
474  reportFlushPosition = true;
475  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
476  }
477  else
478  {
479  if (stream->synchronous)
480  reportFlushPosition = true;
481  else
482  reportFlushPosition = false;
483  slotcmd[0] = 0;
484  }
485 
486  if (stream->sysidentifier != NULL)
487  {
488  /* Validate system identifier hasn't changed */
489  res = PQexec(conn, "IDENTIFY_SYSTEM");
490  if (PQresultStatus(res) != PGRES_TUPLES_OK)
491  {
492  fprintf(stderr,
493  _("%s: could not send replication command \"%s\": %s"),
494  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
495  PQclear(res);
496  return false;
497  }
498  if (PQntuples(res) != 1 || PQnfields(res) < 3)
499  {
500  fprintf(stderr,
501  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
502  progname, PQntuples(res), PQnfields(res), 1, 3);
503  PQclear(res);
504  return false;
505  }
506  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
507  {
508  fprintf(stderr,
509  _("%s: system identifier does not match between base backup and streaming connection\n"),
510  progname);
511  PQclear(res);
512  return false;
513  }
514  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
515  {
516  fprintf(stderr,
517  _("%s: starting timeline %u is not present in the server\n"),
518  progname, stream->timeline);
519  PQclear(res);
520  return false;
521  }
522  PQclear(res);
523  }
524 
525  /*
526  * initialize flush position to starting point, it's the caller's
527  * responsibility that that's sane.
528  */
529  lastFlushPosition = stream->startpos;
530 
531  while (1)
532  {
533  /*
534  * Fetch the timeline history file for this timeline, if we don't have
535  * it already. When streaming log to tar, this will always return
536  * false, as we are never streaming into an existing file and
537  * therefore there can be no pre-existing timeline history file.
538  */
539  if (!existsTimeLineHistoryFile(stream))
540  {
541  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
542  res = PQexec(conn, query);
543  if (PQresultStatus(res) != PGRES_TUPLES_OK)
544  {
545  /* FIXME: we might send it ok, but get an error */
546  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
547  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
548  PQclear(res);
549  return false;
550  }
551 
552  /*
553  * The response to TIMELINE_HISTORY is a single row result set
554  * with two fields: filename and content
555  */
556  if (PQnfields(res) != 2 || PQntuples(res) != 1)
557  {
558  fprintf(stderr,
559  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
560  progname, PQntuples(res), PQnfields(res), 1, 2);
561  }
562 
563  /* Write the history file to disk */
565  PQgetvalue(res, 0, 0),
566  PQgetvalue(res, 0, 1));
567 
568  PQclear(res);
569  }
570 
571  /*
572  * Before we start streaming from the requested location, check if the
573  * callback tells us to stop here.
574  */
575  if (stream->stream_stop(stream->startpos, stream->timeline, false))
576  return true;
577 
578  /* Initiate the replication stream at specified location */
579  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
580  slotcmd,
581  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
582  stream->timeline);
583  res = PQexec(conn, query);
584  if (PQresultStatus(res) != PGRES_COPY_BOTH)
585  {
586  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
587  progname, "START_REPLICATION", PQresultErrorMessage(res));
588  PQclear(res);
589  return false;
590  }
591  PQclear(res);
592 
593  /* Stream the WAL */
594  res = HandleCopyStream(conn, stream, &stoppos);
595  if (res == NULL)
596  goto error;
597 
598  /*
599  * Streaming finished.
600  *
601  * There are two possible reasons for that: a controlled shutdown, or
602  * we reached the end of the current timeline. In case of
603  * end-of-timeline, the server sends a result set after Copy has
604  * finished, containing information about the next timeline. Read
605  * that, and restart streaming from the next timeline. In case of
606  * controlled shutdown, stop here.
607  */
608  if (PQresultStatus(res) == PGRES_TUPLES_OK)
609  {
610  /*
611  * End-of-timeline. Read the next timeline's ID and starting
612  * position. Usually, the starting position will match the end of
613  * the previous timeline, but there are corner cases like if the
614  * server had sent us half of a WAL record, when it was promoted.
615  * The new timeline will begin at the end of the last complete
616  * record in that case, overlapping the partial WAL record on the
617  * old timeline.
618  */
619  uint32 newtimeline;
620  bool parsed;
621 
622  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
623  PQclear(res);
624  if (!parsed)
625  goto error;
626 
627  /* Sanity check the values the server gave us */
628  if (newtimeline <= stream->timeline)
629  {
630  fprintf(stderr,
631  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
632  progname, newtimeline, stream->timeline);
633  goto error;
634  }
635  if (stream->startpos > stoppos)
636  {
637  fprintf(stderr,
638  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
639  progname,
640  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
641  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
642  goto error;
643  }
644 
645  /* Read the final result, which should be CommandComplete. */
646  res = PQgetResult(conn);
647  if (PQresultStatus(res) != PGRES_COMMAND_OK)
648  {
649  fprintf(stderr,
650  _("%s: unexpected termination of replication stream: %s"),
652  PQclear(res);
653  goto error;
654  }
655  PQclear(res);
656 
657  /*
658  * Loop back to start streaming from the new timeline. Always
659  * start streaming at the beginning of a segment.
660  */
661  stream->timeline = newtimeline;
662  stream->startpos = stream->startpos -
664  continue;
665  }
666  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
667  {
668  PQclear(res);
669 
670  /*
671  * End of replication (ie. controlled shut down of the server).
672  *
673  * Check if the callback thinks it's OK to stop here. If not,
674  * complain.
675  */
676  if (stream->stream_stop(stoppos, stream->timeline, false))
677  return true;
678  else
679  {
680  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
681  progname);
682  goto error;
683  }
684  }
685  else
686  {
687  /* Server returned an error. */
688  fprintf(stderr,
689  _("%s: unexpected termination of replication stream: %s"),
691  PQclear(res);
692  goto error;
693  }
694  }
695 
696 error:
697  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
698  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
700  walfile = NULL;
701  return false;
702 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2732
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6106
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3118
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
char * sysidentifier
Definition: receivelog.h:34
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2724
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2647
const char *(* getlasterror)(void)
Definition: walmethods.h:78
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:709
static Walfile * walfile
Definition: receivelog.c:33
char * replication_slot
Definition: receivelog.h:49
static bool reportFlushPosition
Definition: receivelog.c:35
unsigned int uint32
Definition: c.h:306
stream_stop_callback stream_stop
Definition: receivelog.h:42
WalWriteMethod * walmethod
Definition: receivelog.h:47
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
void PQclear(PGresult *res)
Definition: fe-exec.c:671
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:757
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:251
bool synchronous
Definition: receivelog.h:37
int WalSegSz
Definition: pg_standby.c:39
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2663
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1897
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
#define _(x)
Definition: elog.c:84
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1753
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:268
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:367