PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
receivelog.h File Reference
#include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  StreamCtl
 

Typedefs

typedef bool(* stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl StreamCtl
 

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Typedef Documentation

typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 24 of file receivelog.h.

Function Documentation

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 363 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

364 {
365  int minServerMajor,
366  maxServerMajor;
367  int serverMajor;
368 
369  /*
370  * The message format used in streaming replication changed in 9.3, so we
371  * cannot stream from older servers. And we don't support servers newer
372  * than the client; it might work, but we don't know, so err on the safe
373  * side.
374  */
375  minServerMajor = 903;
376  maxServerMajor = PG_VERSION_NUM / 100;
377  serverMajor = PQserverVersion(conn) / 100;
378  if (serverMajor < minServerMajor)
379  {
380  const char *serverver = PQparameterStatus(conn, "server_version");
381 
382  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions older than %s\n"),
383  progname,
384  serverver ? serverver : "'unknown'",
385  "9.3");
386  return false;
387  }
388  else if (serverMajor > maxServerMajor)
389  {
390  const char *serverver = PQparameterStatus(conn, "server_version");
391 
392  fprintf(stderr, _("%s: incompatible server version %s; client does not support streaming from server versions newer than %s\n"),
393  progname,
394  serverver ? serverver : "'unknown'",
395  PG_VERSION);
396  return false;
397  }
398  return true;
399 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:5899
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5924
const char * progname
Definition: pg_standby.c:37
#define _(x)
Definition: elog.c:84
bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 436 of file receivelog.c.

References _, CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf(), StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::temp_slot, StreamCtl::timeline, walfile, StreamCtl::walmethod, and writeTimeLineHistoryFile().

Referenced by LogStreamerMain(), and StreamLog().

437 {
438  char query[128];
439  char slotcmd[128];
440  PGresult *res;
441  XLogRecPtr stoppos;
442 
443  /*
444  * The caller should've checked the server version already, but doesn't do
445  * any harm to check it here too.
446  */
448  return false;
449 
450  /*
451  * Decide whether we want to report the flush position. If we report
452  * the flush position, the primary will know what WAL we'll
453  * possibly re-request, and it can then remove older WAL safely.
454  * We must always do that when we are using slots.
455  *
456  * Reporting the flush position makes one eligible as a synchronous
457  * replica. People shouldn't include generic names in
458  * synchronous_standby_names, but we've protected them against it so
459  * far, so let's continue to do so unless specifically requested.
460  */
461  if (stream->replication_slot != NULL)
462  {
463  reportFlushPosition = true;
464  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
465  }
466  else
467  {
468  if (stream->synchronous)
469  reportFlushPosition = true;
470  else
471  reportFlushPosition = false;
472  slotcmd[0] = 0;
473  }
474 
475  if (stream->sysidentifier != NULL)
476  {
477  /* Validate system identifier hasn't changed */
478  res = PQexec(conn, "IDENTIFY_SYSTEM");
479  if (PQresultStatus(res) != PGRES_TUPLES_OK)
480  {
481  fprintf(stderr,
482  _("%s: could not send replication command \"%s\": %s"),
483  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
484  PQclear(res);
485  return false;
486  }
487  if (PQntuples(res) != 1 || PQnfields(res) < 3)
488  {
489  fprintf(stderr,
490  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
491  progname, PQntuples(res), PQnfields(res), 1, 3);
492  PQclear(res);
493  return false;
494  }
495  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
496  {
497  fprintf(stderr,
498  _("%s: system identifier does not match between base backup and streaming connection\n"),
499  progname);
500  PQclear(res);
501  return false;
502  }
503  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
504  {
505  fprintf(stderr,
506  _("%s: starting timeline %u is not present in the server\n"),
507  progname, stream->timeline);
508  PQclear(res);
509  return false;
510  }
511  PQclear(res);
512  }
513 
514  /*
515  * Create temporary replication slot if one is needed
516  */
517  if (stream->temp_slot)
518  {
519  snprintf(query, sizeof(query),
520  "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
521  stream->replication_slot);
522  res = PQexec(conn, query);
523  if (PQresultStatus(res) != PGRES_TUPLES_OK)
524  {
525  fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
526  progname, stream->replication_slot, PQerrorMessage(conn));
527  PQclear(res);
528  return false;
529  }
530  }
531 
532  /*
533  * initialize flush position to starting point, it's the caller's
534  * responsibility that that's sane.
535  */
536  lastFlushPosition = stream->startpos;
537 
538  while (1)
539  {
540  /*
541  * Fetch the timeline history file for this timeline, if we don't have
542  * it already. When streaming log to tar, this will always return
543  * false, as we are never streaming into an existing file and
544  * therefore there can be no pre-existing timeline history file.
545  */
546  if (!existsTimeLineHistoryFile(stream))
547  {
548  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
549  res = PQexec(conn, query);
550  if (PQresultStatus(res) != PGRES_TUPLES_OK)
551  {
552  /* FIXME: we might send it ok, but get an error */
553  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
554  progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
555  PQclear(res);
556  return false;
557  }
558 
559  /*
560  * The response to TIMELINE_HISTORY is a single row result set
561  * with two fields: filename and content
562  */
563  if (PQnfields(res) != 2 || PQntuples(res) != 1)
564  {
565  fprintf(stderr,
566  _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
567  progname, PQntuples(res), PQnfields(res), 1, 2);
568  }
569 
570  /* Write the history file to disk */
572  PQgetvalue(res, 0, 0),
573  PQgetvalue(res, 0, 1));
574 
575  PQclear(res);
576  }
577 
578  /*
579  * Before we start streaming from the requested location, check if the
580  * callback tells us to stop here.
581  */
582  if (stream->stream_stop(stream->startpos, stream->timeline, false))
583  return true;
584 
585  /* Initiate the replication stream at specified location */
586  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
587  slotcmd,
588  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
589  stream->timeline);
590  res = PQexec(conn, query);
591  if (PQresultStatus(res) != PGRES_COPY_BOTH)
592  {
593  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
594  progname, "START_REPLICATION", PQresultErrorMessage(res));
595  PQclear(res);
596  return false;
597  }
598  PQclear(res);
599 
600  /* Stream the WAL */
601  res = HandleCopyStream(conn, stream, &stoppos);
602  if (res == NULL)
603  goto error;
604 
605  /*
606  * Streaming finished.
607  *
608  * There are two possible reasons for that: a controlled shutdown, or
609  * we reached the end of the current timeline. In case of
610  * end-of-timeline, the server sends a result set after Copy has
611  * finished, containing information about the next timeline. Read
612  * that, and restart streaming from the next timeline. In case of
613  * controlled shutdown, stop here.
614  */
615  if (PQresultStatus(res) == PGRES_TUPLES_OK)
616  {
617  /*
618  * End-of-timeline. Read the next timeline's ID and starting
619  * position. Usually, the starting position will match the end of
620  * the previous timeline, but there are corner cases like if the
621  * server had sent us half of a WAL record, when it was promoted.
622  * The new timeline will begin at the end of the last complete
623  * record in that case, overlapping the partial WAL record on the
624  * the old timeline.
625  */
626  uint32 newtimeline;
627  bool parsed;
628 
629  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
630  PQclear(res);
631  if (!parsed)
632  goto error;
633 
634  /* Sanity check the values the server gave us */
635  if (newtimeline <= stream->timeline)
636  {
637  fprintf(stderr,
638  _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
639  progname, newtimeline, stream->timeline);
640  goto error;
641  }
642  if (stream->startpos > stoppos)
643  {
644  fprintf(stderr,
645  _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
646  progname,
647  stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
648  newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
649  goto error;
650  }
651 
652  /* Read the final result, which should be CommandComplete. */
653  res = PQgetResult(conn);
654  if (PQresultStatus(res) != PGRES_COMMAND_OK)
655  {
656  fprintf(stderr,
657  _("%s: unexpected termination of replication stream: %s"),
659  PQclear(res);
660  goto error;
661  }
662  PQclear(res);
663 
664  /*
665  * Loop back to start streaming from the new timeline. Always
666  * start streaming at the beginning of a segment.
667  */
668  stream->timeline = newtimeline;
669  stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
670  continue;
671  }
672  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
673  {
674  PQclear(res);
675 
676  /*
677  * End of replication (ie. controlled shut down of the server).
678  *
679  * Check if the callback thinks it's OK to stop here. If not,
680  * complain.
681  */
682  if (stream->stream_stop(stoppos, stream->timeline, false))
683  return true;
684  else
685  {
686  fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
687  progname);
688  goto error;
689  }
690  }
691  else
692  {
693  /* Server returned an error. */
694  fprintf(stderr,
695  _("%s: unexpected termination of replication stream: %s"),
697  PQclear(res);
698  goto error;
699  }
700  }
701 
702 error:
703  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
704  fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
706  walfile = NULL;
707  return false;
708 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5934
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
static void error(void)
Definition: sql-dyntest.c:147
char * sysidentifier
Definition: receivelog.h:34
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
XLogRecPtr startpos
Definition: receivelog.h:32
const char * progname
Definition: pg_standby.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
TimeLineID timeline
Definition: receivelog.h:33
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:715
static Walfile * walfile
Definition: receivelog.c:33
char * replication_slot
Definition: receivelog.h:47
static bool reportFlushPosition
Definition: receivelog.c:35
unsigned int uint32
Definition: c.h:268
stream_stop_callback stream_stop
Definition: receivelog.h:43
WalWriteMethod * walmethod
Definition: receivelog.h:45
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:36
void PQclear(PGresult *res)
Definition: fe-exec.c:650
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:763
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:247
const char *(* getlasterror)(void)
Definition: walmethods.h:78
bool synchronous
Definition: receivelog.h:38
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2612
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:34
bool temp_slot
Definition: receivelog.h:48
#define _(x)
Definition: elog.c:84
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1702
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:264
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:363
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47