PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.h File Reference
#include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 24 of file receivelog.h.

Function Documentation

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 366 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

367 {
368  int minServerMajor,
369  maxServerMajor;
370  int serverMajor;
371 
372  /*
373  * The message format used in streaming replication changed in 9.3, so we
374  * cannot stream from older servers. And we don't support servers newer
375  * than the client; it might work, but we don't know, so err on the safe
376  * side.
377  */
378  minServerMajor = 903;
379  maxServerMajor = PG_VERSION_NUM / 100;
380  serverMajor = PQserverVersion(conn) / 100;
381  if (serverMajor < minServerMajor)
382  {
383  const char *serverver = PQparameterStatus(conn, "server_version");
384 
385  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
386  progname,
387  serverver ? serverver : "'unknown'",
388  "9.3");
389  return false;
390  }
391  else if (serverMajor > maxServerMajor)
392  {
393  const char *serverver = PQparameterStatus(conn, "server_version");
394 
395  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
396  progname,
397  serverver ? serverver : "'unknown'",
398  PG_VERSION);
399  return false;
400  }
401  return true;
402 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6062
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6087
const char * progname
Definition: pg_standby.c:37
#define _(x)
Definition: elog.c:84
bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 446 of file receivelog.c.

References _, CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf(), StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::temp_slot, StreamCtl::timeline, walfile, StreamCtl::walmethod, and writeTimeLineHistoryFile().

Referenced by LogStreamerMain(), and StreamLog().

447 {
448  char query[128];
449  char slotcmd[128];
450  PGresult *res;
451  XLogRecPtr stoppos;
452 
453  /*
454  * The caller should've checked the server version already, but doesn't do
455  * any harm to check it here too.
456  */
458  return false;
459 
460  /*
461  * Decide whether we want to report the flush position. If we report the
462  * flush position, the primary will know what WAL we'll possibly
463  * re-request, and it can then remove older WAL safely. We must always do
464  * that when we are using slots.
465  *
466  * Reporting the flush position makes one eligible as a synchronous
467  * replica. People shouldn't include generic names in
468  * synchronous_standby_names, but we've protected them against it so far,
469  * so let's continue to do so unless specifically requested.
470  */
471  if (stream->replication_slot != NULL)
472  {
473  reportFlushPosition = true;
474  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
475  }
476  else
477  {
478  if (stream->synchronous)
479  reportFlushPosition = true;
480  else
481  reportFlushPosition = false;
482  slotcmd[0] = 0;
483  }
484 
485  if (stream->sysidentifier != NULL)
486  {
487  /* Validate system identifier hasn't changed */
488  res = PQexec(conn, "IDENTIFY_SYSTEM");
489  if (PQresultStatus(res) != PGRES_TUPLES_OK)
490  {
491  fprintf(stderr,
492  _("%s: could not send replication command \"%s\": %s"),
493  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
494  PQclear(res);
495  return false;
496  }
497  if (PQntuples(res) != 1 || PQnfields(res) < 3)
498  {
499  fprintf(stderr,
500  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
501  progname, PQntuples(res), PQnfields(res), 1, 3);
502  PQclear(res);
503  return false;
504  }
505  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
506  {
507  fprintf(stderr,
508  _("%s: system identifier does not match between base backup and streaming connection\n"),
509  progname);
510  PQclear(res);
511  return false;
512  }
513  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
514  {
515  fprintf(stderr,
516  _("%s: starting timeline %u is not present in the server\n"),
517  progname, stream->timeline);
518  PQclear(res);
519  return false;
520  }
521  PQclear(res);
522  }
523 
524  /*
525  * Create temporary replication slot if one is needed
526  */
527  if (stream->temp_slot)
528  {
529  snprintf(query, sizeof(query),
530  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
531  stream->replication_slot);
532  res = PQexec(conn, query);
533  if (PQresultStatus(res) != PGRES_TUPLES_OK)
534  {
535  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
536  progname, stream->replication_slot, PQerrorMessage(conn));
537  PQclear(res);
538  return false;
539  }
540  }
541 
542  /*
543  * initialize flush position to starting point, it's the caller's
544  * responsibility that that's sane.
545  */
546  lastFlushPosition = stream->startpos;
547 
548  while (1)
549  {
550  /*
551  * Fetch the timeline history file for this timeline, if we don't have
552  * it already. When streaming log to tar, this will always return
553  * false, as we are never streaming into an existing file and
554  * therefore there can be no pre-existing timeline history file.
555  */
556  if (!existsTimeLineHistoryFile(stream))
557  {
558  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
559  res = PQexec(conn, query);
560  if (PQresultStatus(res) != PGRES_TUPLES_OK)
561  {
562  /* FIXME: we might send it ok, but get an error */
563  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
564  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
565  PQclear(res);
566  return false;
567  }
568 
569  /*
570  * The response to TIMELINE_HISTORY is a single row result set
571  * with two fields: filename and content
572  */
573  if (PQnfields(res) != 2 || PQntuples(res) != 1)
574  {
575  fprintf(stderr,
576  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
577  progname, PQntuples(res), PQnfields(res), 1, 2);
578  }
579 
580  /* Write the history file to disk */
582  PQgetvalue(res, 0, 0),
583  PQgetvalue(res, 0, 1));
584 
585  PQclear(res);
586  }
587 
588  /*
589  * Before we start streaming from the requested location, check if the
590  * callback tells us to stop here.
591  */
592  if (stream->stream_stop(stream->startpos, stream->timeline, false))
593  return true;
594 
595  /* Initiate the replication stream at specified location */
596  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
597  slotcmd,
598  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
599  stream->timeline);
600  res = PQexec(conn, query);
601  if (PQresultStatus(res) != PGRES_COPY_BOTH)
602  {
603  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
604  progname, "START_REPLICATION", PQresultErrorMessage(res));
605  PQclear(res);
606  return false;
607  }
608  PQclear(res);
609 
610  /* Stream the WAL */
611  res = HandleCopyStream(conn, stream, &stoppos);
612  if (res == NULL)
613  goto error;
614 
615  /*
616  * Streaming finished.
617  *
618  * There are two possible reasons for that: a controlled shutdown, or
619  * we reached the end of the current timeline. In case of
620  * end-of-timeline, the server sends a result set after Copy has
621  * finished, containing information about the next timeline. Read
622  * that, and restart streaming from the next timeline. In case of
623  * controlled shutdown, stop here.
624  */
625  if (PQresultStatus(res) == PGRES_TUPLES_OK)
626  {
627  /*
628  * End-of-timeline. Read the next timeline's ID and starting
629  * position. Usually, the starting position will match the end of
630  * the previous timeline, but there are corner cases like if the
631  * server had sent us half of a WAL record, when it was promoted.
632  * The new timeline will begin at the end of the last complete
633  * record in that case, overlapping the partial WAL record on the
634  * old timeline.
635  */
636  uint32 newtimeline;
637  bool parsed;
638 
639  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
640  PQclear(res);
641  if (!parsed)
642  goto error;
643 
644  /* Sanity check the values the server gave us */
645  if (newtimeline <= stream->timeline)
646  {
647  fprintf(stderr,
648  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
649  progname, newtimeline, stream->timeline);
650  goto error;
651  }
652  if (stream->startpos > stoppos)
653  {
654  fprintf(stderr,
655  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
656  progname,
657  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
658  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
659  goto error;
660  }
661 
662  /* Read the final result, which should be CommandComplete. */
663  res = PQgetResult(conn);
664  if (PQresultStatus(res) != PGRES_COMMAND_OK)
665  {
666  fprintf(stderr,
667  _("%s: unexpected termination of replication stream: %s"),
669  PQclear(res);
670  goto error;
671  }
672  PQclear(res);
673 
674  /*
675  * Loop back to start streaming from the new timeline. Always
676  * start streaming at the beginning of a segment.
677  */
678  stream->timeline = newtimeline;
679  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
680  continue;
681  }
682  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
683  {
684  PQclear(res);
685 
686  /*
687  * End of replication (ie. controlled shut down of the server).
688  *
689  * Check if the callback thinks it's OK to stop here. If not,
690  * complain.
691  */
692  if (stream->stream_stop(stoppos, stream->timeline, false))
693  return true;
694  else
695  {
696  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
697  progname);
698  goto error;
699  }
700  }
701  else
702  {
703  /* Server returned an error. */
704  fprintf(stderr,
705  _("%s: unexpected termination of replication stream: %s"),
707  PQclear(res);
708  goto error;
709  }
710  }
711 
712 error:
713  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
714  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
716  walfile = NULL;
717  return false;
718 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6097
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static void error(void)
Definition: sql-dyntest.c:147
char * sysidentifier
Definition: receivelog.h:34
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:725
static Walfile * walfile
Definition: receivelog.c:33
char * replication_slot
Definition: receivelog.h:49
static bool reportFlushPosition
Definition: receivelog.c:35
unsigned int uint32
Definition: c.h:268
stream_stop_callback stream_stop
Definition: receivelog.h:42
WalWriteMethod * walmethod
Definition: receivelog.h:47
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:773
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:250
const char *(* getlasterror)(void)
Definition: walmethods.h:78
bool synchronous
Definition: receivelog.h:37
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:50
#define _(x)
Definition: elog.c:84
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:267
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:366
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47