PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.h File Reference
#include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 24 of file receivelog.h.

Function Documentation

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 364 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

365 {
366  int minServerMajor,
367  maxServerMajor;
368  int serverMajor;
369 
370  /*
371  * The message format used in streaming replication changed in 9.3, so we
372  * cannot stream from older servers. And we don't support servers newer
373  * than the client; it might work, but we don't know, so err on the safe
374  * side.
375  */
376  minServerMajor = 903;
377  maxServerMajor = PG_VERSION_NUM / 100;
378  serverMajor = PQserverVersion(conn) / 100;
379  if (serverMajor < minServerMajor)
380  {
381  const char *serverver = PQparameterStatus(conn, "server_version");
382 
383  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
384  progname,
385  serverver ? serverver : "'unknown'",
386  "9.3");
387  return false;
388  }
389  else if (serverMajor > maxServerMajor)
390  {
391  const char *serverver = PQparameterStatus(conn, "server_version");
392 
393  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
394  progname,
395  serverver ? serverver : "'unknown'",
396  PG_VERSION);
397  return false;
398  }
399  return true;
400 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:5953
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5978
const char * progname
Definition: pg_standby.c:37
#define _(x)
Definition: elog.c:84
bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 444 of file receivelog.c.

References _, CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf(), StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::temp_slot, StreamCtl::timeline, walfile, StreamCtl::walmethod, and writeTimeLineHistoryFile().

Referenced by LogStreamerMain(), and StreamLog().

445 {
446  char query[128];
447  char slotcmd[128];
448  PGresult *res;
449  XLogRecPtr stoppos;
450 
451  /*
452  * The caller should've checked the server version already, but doesn't do
453  * any harm to check it here too.
454  */
456  return false;
457 
458  /*
459  * Decide whether we want to report the flush position. If we report the
460  * flush position, the primary will know what WAL we'll possibly
461  * re-request, and it can then remove older WAL safely. We must always do
462  * that when we are using slots.
463  *
464  * Reporting the flush position makes one eligible as a synchronous
465  * replica. People shouldn't include generic names in
466  * synchronous_standby_names, but we've protected them against it so far,
467  * so let's continue to do so unless specifically requested.
468  */
469  if (stream->replication_slot != NULL)
470  {
471  reportFlushPosition = true;
472  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
473  }
474  else
475  {
476  if (stream->synchronous)
477  reportFlushPosition = true;
478  else
479  reportFlushPosition = false;
480  slotcmd[0] = 0;
481  }
482 
483  if (stream->sysidentifier != NULL)
484  {
485  /* Validate system identifier hasn't changed */
486  res = PQexec(conn, "IDENTIFY_SYSTEM");
487  if (PQresultStatus(res) != PGRES_TUPLES_OK)
488  {
489  fprintf(stderr,
490  _("%s: could not send replication command \"%s\": %s"),
491  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
492  PQclear(res);
493  return false;
494  }
495  if (PQntuples(res) != 1 || PQnfields(res) < 3)
496  {
497  fprintf(stderr,
498  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
499  progname, PQntuples(res), PQnfields(res), 1, 3);
500  PQclear(res);
501  return false;
502  }
503  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
504  {
505  fprintf(stderr,
506  _("%s: system identifier does not match between base backup and streaming connection\n"),
507  progname);
508  PQclear(res);
509  return false;
510  }
511  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
512  {
513  fprintf(stderr,
514  _("%s: starting timeline %u is not present in the server\n"),
515  progname, stream->timeline);
516  PQclear(res);
517  return false;
518  }
519  PQclear(res);
520  }
521 
522  /*
523  * Create temporary replication slot if one is needed
524  */
525  if (stream->temp_slot)
526  {
527  snprintf(query, sizeof(query),
528  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
529  stream->replication_slot);
530  res = PQexec(conn, query);
531  if (PQresultStatus(res) != PGRES_TUPLES_OK)
532  {
533  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
534  progname, stream->replication_slot, PQerrorMessage(conn));
535  PQclear(res);
536  return false;
537  }
538  }
539 
540  /*
541  * initialize flush position to starting point, it's the caller's
542  * responsibility that that's sane.
543  */
544  lastFlushPosition = stream->startpos;
545 
546  while (1)
547  {
548  /*
549  * Fetch the timeline history file for this timeline, if we don't have
550  * it already. When streaming log to tar, this will always return
551  * false, as we are never streaming into an existing file and
552  * therefore there can be no pre-existing timeline history file.
553  */
554  if (!existsTimeLineHistoryFile(stream))
555  {
556  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
557  res = PQexec(conn, query);
558  if (PQresultStatus(res) != PGRES_TUPLES_OK)
559  {
560  /* FIXME: we might send it ok, but get an error */
561  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
562  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
563  PQclear(res);
564  return false;
565  }
566 
567  /*
568  * The response to TIMELINE_HISTORY is a single row result set
569  * with two fields: filename and content
570  */
571  if (PQnfields(res) != 2 || PQntuples(res) != 1)
572  {
573  fprintf(stderr,
574  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
575  progname, PQntuples(res), PQnfields(res), 1, 2);
576  }
577 
578  /* Write the history file to disk */
580  PQgetvalue(res, 0, 0),
581  PQgetvalue(res, 0, 1));
582 
583  PQclear(res);
584  }
585 
586  /*
587  * Before we start streaming from the requested location, check if the
588  * callback tells us to stop here.
589  */
590  if (stream->stream_stop(stream->startpos, stream->timeline, false))
591  return true;
592 
593  /* Initiate the replication stream at specified location */
594  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
595  slotcmd,
596  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
597  stream->timeline);
598  res = PQexec(conn, query);
599  if (PQresultStatus(res) != PGRES_COPY_BOTH)
600  {
601  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
602  progname, "START_REPLICATION", PQresultErrorMessage(res));
603  PQclear(res);
604  return false;
605  }
606  PQclear(res);
607 
608  /* Stream the WAL */
609  res = HandleCopyStream(conn, stream, &stoppos);
610  if (res == NULL)
611  goto error;
612 
613  /*
614  * Streaming finished.
615  *
616  * There are two possible reasons for that: a controlled shutdown, or
617  * we reached the end of the current timeline. In case of
618  * end-of-timeline, the server sends a result set after Copy has
619  * finished, containing information about the next timeline. Read
620  * that, and restart streaming from the next timeline. In case of
621  * controlled shutdown, stop here.
622  */
623  if (PQresultStatus(res) == PGRES_TUPLES_OK)
624  {
625  /*
626  * End-of-timeline. Read the next timeline's ID and starting
627  * position. Usually, the starting position will match the end of
628  * the previous timeline, but there are corner cases like if the
629  * server had sent us half of a WAL record, when it was promoted.
630  * The new timeline will begin at the end of the last complete
631  * record in that case, overlapping the partial WAL record on the
632  * the old timeline.
633  */
634  uint32 newtimeline;
635  bool parsed;
636 
637  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
638  PQclear(res);
639  if (!parsed)
640  goto error;
641 
642  /* Sanity check the values the server gave us */
643  if (newtimeline <= stream->timeline)
644  {
645  fprintf(stderr,
646  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
647  progname, newtimeline, stream->timeline);
648  goto error;
649  }
650  if (stream->startpos > stoppos)
651  {
652  fprintf(stderr,
653  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
654  progname,
655  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
656  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
657  goto error;
658  }
659 
660  /* Read the final result, which should be CommandComplete. */
661  res = PQgetResult(conn);
662  if (PQresultStatus(res) != PGRES_COMMAND_OK)
663  {
664  fprintf(stderr,
665  _("%s: unexpected termination of replication stream: %s"),
667  PQclear(res);
668  goto error;
669  }
670  PQclear(res);
671 
672  /*
673  * Loop back to start streaming from the new timeline. Always
674  * start streaming at the beginning of a segment.
675  */
676  stream->timeline = newtimeline;
677  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
678  continue;
679  }
680  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
681  {
682  PQclear(res);
683 
684  /*
685  * End of replication (ie. controlled shut down of the server).
686  *
687  * Check if the callback thinks it's OK to stop here. If not,
688  * complain.
689  */
690  if (stream->stream_stop(stoppos, stream->timeline, false))
691  return true;
692  else
693  {
694  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
695  progname);
696  goto error;
697  }
698  }
699  else
700  {
701  /* Server returned an error. */
702  fprintf(stderr,
703  _("%s: unexpected termination of replication stream: %s"),
705  PQclear(res);
706  goto error;
707  }
708  }
709 
710 error:
711  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
712  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
714  walfile = NULL;
715  return false;
716 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5988
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static void error(void)
Definition: sql-dyntest.c:147
char * sysidentifier
Definition: receivelog.h:34
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:723
static Walfile * walfile
Definition: receivelog.c:33
char * replication_slot
Definition: receivelog.h:50
static bool reportFlushPosition
Definition: receivelog.c:35
unsigned int uint32
Definition: c.h:268
stream_stop_callback stream_stop
Definition: receivelog.h:43
WalWriteMethod * walmethod
Definition: receivelog.h:48
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:771
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:248
const char *(* getlasterror)(void)
Definition: walmethods.h:78
bool synchronous
Definition: receivelog.h:38
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:51
#define _(x)
Definition: elog.c:84
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:265
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:364
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47