PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.h File Reference
#include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 24 of file receivelog.h.

Function Documentation

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 360 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

361 {
362  int minServerMajor,
363  maxServerMajor;
364  int serverMajor;
365 
366  /*
367  * The message format used in streaming replication changed in 9.3, so we
368  * cannot stream from older servers. And we don't support servers newer
369  * than the client; it might work, but we don't know, so err on the safe
370  * side.
371  */
372  minServerMajor = 903;
373  maxServerMajor = PG_VERSION_NUM / 100;
374  serverMajor = PQserverVersion(conn) / 100;
375  if (serverMajor < minServerMajor)
376  {
377  const char *serverver = PQparameterStatus(conn, "server_version");
378 
379  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
380  progname,
381  serverver ? serverver : "'unknown'",
382  "9.3");
383  return false;
384  }
385  else if (serverMajor > maxServerMajor)
386  {
387  const char *serverver = PQparameterStatus(conn, "server_version");
388 
389  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
390  progname,
391  serverver ? serverver : "'unknown'",
392  PG_VERSION);
393  return false;
394  }
395  return true;
396 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:5925
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5950
const char * progname
Definition: pg_standby.c:37
#define _(x)
Definition: elog.c:84
bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 433 of file receivelog.c.

References _, CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf(), StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::temp_slot, StreamCtl::timeline, walfile, StreamCtl::walmethod, and writeTimeLineHistoryFile().

Referenced by LogStreamerMain(), and StreamLog().

434 {
435  char query[128];
436  char slotcmd[128];
437  PGresult *res;
438  XLogRecPtr stoppos;
439 
440  /*
441  * The caller should've checked the server version already, but doesn't do
442  * any harm to check it here too.
443  */
445  return false;
446 
447  /*
448  * Decide whether we want to report the flush position. If we report
449  * the flush position, the primary will know what WAL we'll
450  * possibly re-request, and it can then remove older WAL safely.
451  * We must always do that when we are using slots.
452  *
453  * Reporting the flush position makes one eligible as a synchronous
454  * replica. People shouldn't include generic names in
455  * synchronous_standby_names, but we've protected them against it so
456  * far, so let's continue to do so unless specifically requested.
457  */
458  if (stream->replication_slot != NULL)
459  {
460  reportFlushPosition = true;
461  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
462  }
463  else
464  {
465  if (stream->synchronous)
466  reportFlushPosition = true;
467  else
468  reportFlushPosition = false;
469  slotcmd[0] = 0;
470  }
471 
472  if (stream->sysidentifier != NULL)
473  {
474  /* Validate system identifier hasn't changed */
475  res = PQexec(conn, "IDENTIFY_SYSTEM");
476  if (PQresultStatus(res) != PGRES_TUPLES_OK)
477  {
478  fprintf(stderr,
479  _("%s: could not send replication command \"%s\": %s"),
480  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
481  PQclear(res);
482  return false;
483  }
484  if (PQntuples(res) != 1 || PQnfields(res) < 3)
485  {
486  fprintf(stderr,
487  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
488  progname, PQntuples(res), PQnfields(res), 1, 3);
489  PQclear(res);
490  return false;
491  }
492  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
493  {
494  fprintf(stderr,
495  _("%s: system identifier does not match between base backup and streaming connection\n"),
496  progname);
497  PQclear(res);
498  return false;
499  }
500  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
501  {
502  fprintf(stderr,
503  _("%s: starting timeline %u is not present in the server\n"),
504  progname, stream->timeline);
505  PQclear(res);
506  return false;
507  }
508  PQclear(res);
509  }
510 
511  /*
512  * Create temporary replication slot if one is needed
513  */
514  if (stream->temp_slot)
515  {
516  snprintf(query, sizeof(query),
517  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
518  stream->replication_slot);
519  res = PQexec(conn, query);
520  if (PQresultStatus(res) != PGRES_TUPLES_OK)
521  {
522  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
523  progname, stream->replication_slot, PQerrorMessage(conn));
524  PQclear(res);
525  return false;
526  }
527  }
528 
529  /*
530  * initialize flush position to starting point, it's the caller's
531  * responsibility that that's sane.
532  */
533  lastFlushPosition = stream->startpos;
534 
535  while (1)
536  {
537  /*
538  * Fetch the timeline history file for this timeline, if we don't have
539  * it already. When streaming log to tar, this will always return
540  * false, as we are never streaming into an existing file and
541  * therefore there can be no pre-existing timeline history file.
542  */
543  if (!existsTimeLineHistoryFile(stream))
544  {
545  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
546  res = PQexec(conn, query);
547  if (PQresultStatus(res) != PGRES_TUPLES_OK)
548  {
549  /* FIXME: we might send it ok, but get an error */
550  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
551  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
552  PQclear(res);
553  return false;
554  }
555 
556  /*
557  * The response to TIMELINE_HISTORY is a single row result set
558  * with two fields: filename and content
559  */
560  if (PQnfields(res) != 2 || PQntuples(res) != 1)
561  {
562  fprintf(stderr,
563  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
564  progname, PQntuples(res), PQnfields(res), 1, 2);
565  }
566 
567  /* Write the history file to disk */
569  PQgetvalue(res, 0, 0),
570  PQgetvalue(res, 0, 1));
571 
572  PQclear(res);
573  }
574 
575  /*
576  * Before we start streaming from the requested location, check if the
577  * callback tells us to stop here.
578  */
579  if (stream->stream_stop(stream->startpos, stream->timeline, false))
580  return true;
581 
582  /* Initiate the replication stream at specified location */
583  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
584  slotcmd,
585  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
586  stream->timeline);
587  res = PQexec(conn, query);
588  if (PQresultStatus(res) != PGRES_COPY_BOTH)
589  {
590  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
591  progname, "START_REPLICATION", PQresultErrorMessage(res));
592  PQclear(res);
593  return false;
594  }
595  PQclear(res);
596 
597  /* Stream the WAL */
598  res = HandleCopyStream(conn, stream, &stoppos);
599  if (res == NULL)
600  goto error;
601 
602  /*
603  * Streaming finished.
604  *
605  * There are two possible reasons for that: a controlled shutdown, or
606  * we reached the end of the current timeline. In case of
607  * end-of-timeline, the server sends a result set after Copy has
608  * finished, containing information about the next timeline. Read
609  * that, and restart streaming from the next timeline. In case of
610  * controlled shutdown, stop here.
611  */
612  if (PQresultStatus(res) == PGRES_TUPLES_OK)
613  {
614  /*
615  * End-of-timeline. Read the next timeline's ID and starting
616  * position. Usually, the starting position will match the end of
617  * the previous timeline, but there are corner cases like if the
618  * server had sent us half of a WAL record, when it was promoted.
619  * The new timeline will begin at the end of the last complete
620  * record in that case, overlapping the partial WAL record on the
621  * the old timeline.
622  */
623  uint32 newtimeline;
624  bool parsed;
625 
626  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
627  PQclear(res);
628  if (!parsed)
629  goto error;
630 
631  /* Sanity check the values the server gave us */
632  if (newtimeline <= stream->timeline)
633  {
634  fprintf(stderr,
635  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
636  progname, newtimeline, stream->timeline);
637  goto error;
638  }
639  if (stream->startpos > stoppos)
640  {
641  fprintf(stderr,
642  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
643  progname,
644  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
645  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
646  goto error;
647  }
648 
649  /* Read the final result, which should be CommandComplete. */
650  res = PQgetResult(conn);
651  if (PQresultStatus(res) != PGRES_COMMAND_OK)
652  {
653  fprintf(stderr,
654  _("%s: unexpected termination of replication stream: %s"),
656  PQclear(res);
657  goto error;
658  }
659  PQclear(res);
660 
661  /*
662  * Loop back to start streaming from the new timeline. Always
663  * start streaming at the beginning of a segment.
664  */
665  stream->timeline = newtimeline;
666  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
667  continue;
668  }
669  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
670  {
671  PQclear(res);
672 
673  /*
674  * End of replication (ie. controlled shut down of the server).
675  *
676  * Check if the callback thinks it's OK to stop here. If not,
677  * complain.
678  */
679  if (stream->stream_stop(stoppos, stream->timeline, false))
680  return true;
681  else
682  {
683  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
684  progname);
685  goto error;
686  }
687  }
688  else
689  {
690  /* Server returned an error. */
691  fprintf(stderr,
692  _("%s: unexpected termination of replication stream: %s"),
694  PQclear(res);
695  goto error;
696  }
697  }
698 
699 error:
700  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
701  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
703  walfile = NULL;
704  return false;
705 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5960
char *(* getlasterror)(void)
Definition: walmethods.h:34
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static void error(void)
Definition: sql-dyntest.c:147
char * sysidentifier
Definition: receivelog.h:34
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:712
static Walfile * walfile
Definition: receivelog.c:33
char * replication_slot
Definition: receivelog.h:47
static bool reportFlushPosition
Definition: receivelog.c:35
unsigned int uint32
Definition: c.h:265
stream_stop_callback stream_stop
Definition: receivelog.h:43
WalWriteMethod * walmethod
Definition: receivelog.h:45
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:760
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:244
bool synchronous
Definition: receivelog.h:38
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:48
#define _(x)
Definition: elog.c:84
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:261
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:360
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:26