PostgreSQL Source Code  git master
receivelog.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/file_utils.h"
#include "common/logging.h"
#include "libpq-fe.h"
#include "receivelog.h"
#include "streamutil.h"
Include dependency graph for receivelog.c:

Go to the source code of this file.

Functions

static PGresultHandleCopyStream (PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
 
static int CopyStreamPoll (PGconn *conn, long timeout_ms, pgsocket stop_socket)
 
static int CopyStreamReceive (PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
 
static bool ProcessKeepaliveMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
 
static bool ProcessXLogDataMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
 
static PGresultHandleEndOfCopyStream (PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
 
static bool CheckCopyStreamStop (PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
 
static long CalculateCopyStreamSleeptime (TimestampTz now, int standby_message_timeout, TimestampTz last_status)
 
static bool ReadEndOfStreamingResult (PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
 
static bool mark_file_as_archived (StreamCtl *stream, const char *fname)
 
static bool open_walfile (StreamCtl *stream, XLogRecPtr startpoint)
 
static bool close_walfile (StreamCtl *stream, XLogRecPtr pos)
 
static bool existsTimeLineHistoryFile (StreamCtl *stream)
 
static bool writeTimeLineHistoryFile (StreamCtl *stream, char *filename, char *content)
 
static bool sendFeedback (PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
 
bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Variables

static Walfilewalfile = NULL
 
static char current_walfile_name [MAXPGPATH] = ""
 
static bool reportFlushPosition = false
 
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
 
static bool still_sending = true
 

Function Documentation

◆ CalculateCopyStreamSleeptime()

static long CalculateCopyStreamSleeptime ( TimestampTz  now,
int  standby_message_timeout,
TimestampTz  last_status 
)
static

Definition at line 1239 of file receivelog.c.

References feTimestampDifference(), and still_sending.

Referenced by HandleCopyStream().

1241 {
1242  TimestampTz status_targettime = 0;
1243  long sleeptime;
1244 
1246  status_targettime = last_status +
1247  (standby_message_timeout - 1) * ((int64) 1000);
1248 
1249  if (status_targettime > 0)
1250  {
1251  long secs;
1252  int usecs;
1253 
1255  status_targettime,
1256  &secs,
1257  &usecs);
1258  /* Always sleep at least 1 sec */
1259  if (secs <= 0)
1260  {
1261  secs = 1;
1262  usecs = 0;
1263  }
1264 
1265  sleeptime = secs * 1000 + usecs / 1000;
1266  }
1267  else
1268  sleeptime = -1;
1269 
1270  return sleeptime;
1271 }
static bool still_sending
Definition: receivelog.c:36
int64 TimestampTz
Definition: timestamp.h:39
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:631
static int standby_message_timeout
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ CheckCopyStreamStop()

static bool CheckCopyStreamStop ( PGconn conn,
StreamCtl stream,
XLogRecPtr  blockpos 
)
static

Definition at line 1214 of file receivelog.c.

References close_walfile(), pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, and StreamCtl::timeline.

Referenced by HandleCopyStream().

1215 {
1216  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1217  {
1218  if (!close_walfile(stream, blockpos))
1219  {
1220  /* Potential error message is written by close_walfile */
1221  return false;
1222  }
1223  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1224  {
1225  pg_log_error("could not send copy-end packet: %s",
1226  PQerrorMessage(conn));
1227  return false;
1228  }
1229  still_sending = false;
1230  }
1231 
1232  return true;
1233 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
static bool still_sending
Definition: receivelog.c:36
#define pg_log_error(...)
Definition: logging.h:80
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2600
TimeLineID timeline
Definition: receivelog.h:32
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:186
stream_stop_callback stream_stop
Definition: receivelog.h:41

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 366 of file receivelog.c.

References pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

367 {
368  int minServerMajor,
369  maxServerMajor;
370  int serverMajor;
371 
372  /*
373  * The message format used in streaming replication changed in 9.3, so we
374  * cannot stream from older servers. And we don't support servers newer
375  * than the client; it might work, but we don't know, so err on the safe
376  * side.
377  */
378  minServerMajor = 903;
379  maxServerMajor = PG_VERSION_NUM / 100;
380  serverMajor = PQserverVersion(conn) / 100;
381  if (serverMajor < minServerMajor)
382  {
383  const char *serverver = PQparameterStatus(conn, "server_version");
384 
385  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
386  serverver ? serverver : "'unknown'",
387  "9.3");
388  return false;
389  }
390  else if (serverMajor > maxServerMajor)
391  {
392  const char *serverver = PQparameterStatus(conn, "server_version");
393 
394  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
395  serverver ? serverver : "'unknown'",
396  PG_VERSION);
397  return false;
398  }
399  return true;
400 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6702
#define pg_log_error(...)
Definition: logging.h:80
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6727

◆ close_walfile()

static bool close_walfile ( StreamCtl stream,
XLogRecPtr  pos 
)
static

Definition at line 186 of file receivelog.c.

References WalWriteMethod::close, CLOSE_NO_RENAME, CLOSE_NORMAL, CLOSE_UNLINK, current_walfile_name, WalWriteMethod::get_current_pos, WalWriteMethod::getlasterror, lastFlushPosition, StreamCtl::mark_done, mark_file_as_archived(), StreamCtl::partial_suffix, pg_log_error, pg_log_info, walfile, StreamCtl::walmethod, and WalSegSz.

Referenced by CheckCopyStreamStop(), HandleEndOfCopyStream(), and ProcessXLogDataMsg().

187 {
188  off_t currpos;
189  int r;
190 
191  if (walfile == NULL)
192  return true;
193 
194  currpos = stream->walmethod->get_current_pos(walfile);
195  if (currpos == -1)
196  {
197  pg_log_error("could not determine seek position in file \"%s\": %s",
199  stream->walmethod->close(walfile, CLOSE_UNLINK);
200  walfile = NULL;
201 
202  return false;
203  }
204 
205  if (stream->partial_suffix)
206  {
207  if (currpos == WalSegSz)
208  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
209  else
210  {
211  pg_log_info("not renaming \"%s%s\", segment is not complete",
213  r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
214  }
215  }
216  else
217  r = stream->walmethod->close(walfile, CLOSE_NORMAL);
218 
219  walfile = NULL;
220 
221  if (r != 0)
222  {
223  pg_log_error("could not close file \"%s\": %s",
225  return false;
226  }
227 
228  /*
229  * Mark file as archived if requested by the caller - pg_basebackup needs
230  * to do so as files can otherwise get archived again after promotion of a
231  * new node. This is in line with walreceiver.c always doing a
232  * XLogArchiveForceDone() after a complete segment.
233  */
234  if (currpos == WalSegSz && stream->mark_done)
235  {
236  /* writes error message if failed */
238  return false;
239  }
240 
241  lastFlushPosition = pos;
242  return true;
243 }
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
#define pg_log_error(...)
Definition: logging.h:80
char * partial_suffix
Definition: receivelog.h:47
const char *(* getlasterror)(void)
Definition: walmethods.h:87
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:71
static Walfile * walfile
Definition: receivelog.c:31
bool mark_done
Definition: receivelog.h:37
WalWriteMethod * walmethod
Definition: receivelog.h:46
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
uint32 WalSegSz
Definition: streamutil.c:34
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:57
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define pg_log_info(...)
Definition: logging.h:88

◆ CopyStreamPoll()

static int CopyStreamPoll ( PGconn conn,
long  timeout_ms,
pgsocket  stop_socket 
)
static

Definition at line 869 of file receivelog.c.

References EINTR, Max, pg_log_error, PGINVALID_SOCKET, PQerrorMessage(), PQsocket(), and select.

Referenced by CopyStreamReceive().

870 {
871  int ret;
872  fd_set input_mask;
873  int connsocket;
874  int maxfd;
875  struct timeval timeout;
876  struct timeval *timeoutptr;
877 
878  connsocket = PQsocket(conn);
879  if (connsocket < 0)
880  {
881  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
882  return -1;
883  }
884 
885  FD_ZERO(&input_mask);
886  FD_SET(connsocket, &input_mask);
887  maxfd = connsocket;
888  if (stop_socket != PGINVALID_SOCKET)
889  {
890  FD_SET(stop_socket, &input_mask);
891  maxfd = Max(maxfd, stop_socket);
892  }
893 
894  if (timeout_ms < 0)
895  timeoutptr = NULL;
896  else
897  {
898  timeout.tv_sec = timeout_ms / 1000L;
899  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900  timeoutptr = &timeout;
901  }
902 
903  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
904 
905  if (ret < 0)
906  {
907  if (errno == EINTR)
908  return 0; /* Got a signal, so not an error */
909  pg_log_error("%s() failed: %m", "select");
910  return -1;
911  }
912  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
913  return 1; /* Got input on connection socket */
914 
915  return 0; /* Got timeout or input on stop_socket */
916 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
#define pg_log_error(...)
Definition: logging.h:80
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
#define PGINVALID_SOCKET
Definition: port.h:33
#define Max(x, y)
Definition: c.h:980
#define EINTR
Definition: win32_port.h:343
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6763

◆ CopyStreamReceive()

static int CopyStreamReceive ( PGconn conn,
long  timeout,
pgsocket  stop_socket,
char **  buffer 
)
static

Definition at line 931 of file receivelog.c.

References copybuf, CopyStreamPoll(), pg_log_error, PQconsumeInput(), PQerrorMessage(), PQfreemem(), and PQgetCopyData().

Referenced by HandleCopyStream().

933 {
934  char *copybuf = NULL;
935  int rawlen;
936 
937  if (*buffer != NULL)
938  PQfreemem(*buffer);
939  *buffer = NULL;
940 
941  /* Try to receive a CopyData message */
942  rawlen = PQgetCopyData(conn, &copybuf, 1);
943  if (rawlen == 0)
944  {
945  int ret;
946 
947  /*
948  * No data available. Wait for some to appear, but not longer than
949  * the specified timeout, so that we can ping the server. Also stop
950  * waiting if input appears on stop_socket.
951  */
952  ret = CopyStreamPoll(conn, timeout, stop_socket);
953  if (ret <= 0)
954  return ret;
955 
956  /* Now there is actually data on the socket */
957  if (PQconsumeInput(conn) == 0)
958  {
959  pg_log_error("could not receive data from WAL stream: %s",
960  PQerrorMessage(conn));
961  return -1;
962  }
963 
964  /* Now that we've consumed some input, try again */
965  rawlen = PQgetCopyData(conn, &copybuf, 1);
966  if (rawlen == 0)
967  return 0;
968  }
969  if (rawlen == -1) /* end-of-streaming or error */
970  return -2;
971  if (rawlen == -2)
972  {
973  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
974  return -1;
975  }
976 
977  /* Return received messages to caller */
978  *buffer = copybuf;
979  return rawlen;
980 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:869
#define pg_log_error(...)
Definition: logging.h:80
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2668
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
StringInfo copybuf
Definition: tablesync.c:124
void PQfreemem(void *ptr)
Definition: fe-exec.c:3796

◆ existsTimeLineHistoryFile()

static bool existsTimeLineHistoryFile ( StreamCtl stream)
static

Definition at line 250 of file receivelog.c.

References WalWriteMethod::existsfile, MAXFNAMELEN, StreamCtl::timeline, TLHistoryFileName, and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

251 {
252  char histfname[MAXFNAMELEN];
253 
254  /*
255  * Timeline 1 never has a history file. We treat that as if it existed,
256  * since we never need to stream it.
257  */
258  if (stream->timeline == 1)
259  return true;
260 
261  TLHistoryFileName(histfname, stream->timeline);
262 
263  return stream->walmethod->existsfile(histfname);
264 }
TimeLineID timeline
Definition: receivelog.h:32
#define TLHistoryFileName(fname, tli)
WalWriteMethod * walmethod
Definition: receivelog.h:46
#define MAXFNAMELEN
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50

◆ HandleCopyStream()

static PGresult * HandleCopyStream ( PGconn conn,
StreamCtl stream,
XLogRecPtr stoppos 
)
static

Definition at line 740 of file receivelog.c.

References CalculateCopyStreamSleeptime(), CheckCopyStreamStop(), copybuf, CopyStreamReceive(), current_walfile_name, error(), feGetCurrentTimestamp(), feTimestampDifferenceExceeds(), WalWriteMethod::getlasterror, HandleEndOfCopyStream(), lastFlushPosition, now(), pg_log_error, pg_log_fatal, PQfreemem(), ProcessKeepaliveMsg(), ProcessXLogDataMsg(), sendFeedback(), StreamCtl::standby_message_timeout, StreamCtl::startpos, still_sending, StreamCtl::stop_socket, WalWriteMethod::sync, StreamCtl::synchronous, walfile, and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

742 {
743  char *copybuf = NULL;
744  TimestampTz last_status = -1;
745  XLogRecPtr blockpos = stream->startpos;
746 
747  still_sending = true;
748 
749  while (1)
750  {
751  int r;
753  long sleeptime;
754 
755  /*
756  * Check if we should continue streaming, or abort at this point.
757  */
758  if (!CheckCopyStreamStop(conn, stream, blockpos))
759  goto error;
760 
761  now = feGetCurrentTimestamp();
762 
763  /*
764  * If synchronous option is true, issue sync command as soon as there
765  * are WAL data which has not been flushed yet.
766  */
767  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
768  {
769  if (stream->walmethod->sync(walfile) != 0)
770  {
771  pg_log_fatal("could not fsync file \"%s\": %s",
773  exit(1);
774  }
775  lastFlushPosition = blockpos;
776 
777  /*
778  * Send feedback so that the server sees the latest WAL locations
779  * immediately.
780  */
781  if (!sendFeedback(conn, blockpos, now, false))
782  goto error;
783  last_status = now;
784  }
785 
786  /*
787  * Potentially send a status message to the primary
788  */
789  if (still_sending && stream->standby_message_timeout > 0 &&
790  feTimestampDifferenceExceeds(last_status, now,
791  stream->standby_message_timeout))
792  {
793  /* Time to send feedback! */
794  if (!sendFeedback(conn, blockpos, now, false))
795  goto error;
796  last_status = now;
797  }
798 
799  /*
800  * Calculate how long send/receive loops should sleep
801  */
802  sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
803  last_status);
804 
805  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
806  while (r != 0)
807  {
808  if (r == -1)
809  goto error;
810  if (r == -2)
811  {
812  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
813 
814  if (res == NULL)
815  goto error;
816  else
817  return res;
818  }
819 
820  /* Check the message type. */
821  if (copybuf[0] == 'k')
822  {
823  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
824  &last_status))
825  goto error;
826  }
827  else if (copybuf[0] == 'w')
828  {
829  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
830  goto error;
831 
832  /*
833  * Check if we should continue streaming, or abort at this
834  * point.
835  */
836  if (!CheckCopyStreamStop(conn, stream, blockpos))
837  goto error;
838  }
839  else
840  {
841  pg_log_error("unrecognized streaming header: \"%c\"",
842  copybuf[0]);
843  goto error;
844  }
845 
846  /*
847  * Process the received data, and any subsequent data we can read
848  * without blocking.
849  */
850  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
851  }
852  }
853 
854 error:
855  if (copybuf != NULL)
856  PQfreemem(copybuf);
857  return NULL;
858 }
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:612
static void error(void)
Definition: sql-dyntest.c:147
static bool still_sending
Definition: receivelog.c:36
int64 TimestampTz
Definition: timestamp.h:39
#define pg_log_error(...)
Definition: logging.h:80
XLogRecPtr startpos
Definition: receivelog.h:31
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1239
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:653
int(* sync)(Walfile f)
Definition: walmethods.h:76
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:986
const char *(* getlasterror)(void)
Definition: walmethods.h:87
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:931
static Walfile * walfile
Definition: receivelog.c:31
WalWriteMethod * walmethod
Definition: receivelog.h:46
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:328
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1173
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1043
StringInfo copybuf
Definition: tablesync.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool synchronous
Definition: receivelog.h:36
pgsocket stop_socket
Definition: receivelog.h:43
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1214
int standby_message_timeout
Definition: receivelog.h:35
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
void PQfreemem(void *ptr)
Definition: fe-exec.c:3796
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define pg_log_fatal(...)
Definition: logging.h:76

◆ HandleEndOfCopyStream()

static PGresult * HandleEndOfCopyStream ( PGconn conn,
StreamCtl stream,
char *  copybuf,
XLogRecPtr  blockpos,
XLogRecPtr stoppos 
)
static

Definition at line 1173 of file receivelog.c.

References close_walfile(), pg_log_error, PGRES_COPY_IN, PQclear(), PQerrorMessage(), PQflush(), PQfreemem(), PQgetResult(), PQputCopyEnd(), PQresultStatus(), and still_sending.

Referenced by HandleCopyStream().

1175 {
1176  PGresult *res = PQgetResult(conn);
1177 
1178  /*
1179  * The server closed its end of the copy stream. If we haven't closed
1180  * ours already, we need to do so now, unless the server threw an error,
1181  * in which case we don't.
1182  */
1183  if (still_sending)
1184  {
1185  if (!close_walfile(stream, blockpos))
1186  {
1187  /* Error message written in close_walfile() */
1188  PQclear(res);
1189  return NULL;
1190  }
1191  if (PQresultStatus(res) == PGRES_COPY_IN)
1192  {
1193  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1194  {
1195  pg_log_error("could not send copy-end packet: %s",
1196  PQerrorMessage(conn));
1197  PQclear(res);
1198  return NULL;
1199  }
1200  res = PQgetResult(conn);
1201  }
1202  still_sending = false;
1203  }
1204  if (copybuf != NULL)
1205  PQfreemem(copybuf);
1206  *stoppos = blockpos;
1207  return res;
1208 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
static bool still_sending
Definition: receivelog.c:36
#define pg_log_error(...)
Definition: logging.h:80
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2600
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:186
StringInfo copybuf
Definition: tablesync.c:124
void PQclear(PGresult *res)
Definition: fe-exec.c:694
void PQfreemem(void *ptr)
Definition: fe-exec.c:3796
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ mark_file_as_archived()

static bool mark_file_as_archived ( StreamCtl stream,
const char *  fname 
)
static

Definition at line 57 of file receivelog.c.

References WalWriteMethod::close, CLOSE_NORMAL, WalWriteMethod::getlasterror, MAXPGPATH, WalWriteMethod::open_for_write, pg_log_error, snprintf, and StreamCtl::walmethod.

Referenced by close_walfile(), and writeTimeLineHistoryFile().

58 {
59  Walfile *f;
60  static char tmppath[MAXPGPATH];
61 
62  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
63  fname);
64 
65  f = stream->walmethod->open_for_write(tmppath, NULL, 0);
66  if (f == NULL)
67  {
68  pg_log_error("could not create archive status file \"%s\": %s",
69  tmppath, stream->walmethod->getlasterror());
70  return false;
71  }
72 
73  stream->walmethod->close(f, CLOSE_NORMAL);
74 
75  return true;
76 }
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
#define pg_log_error(...)
Definition: logging.h:80
const char *(* getlasterror)(void)
Definition: walmethods.h:87
#define MAXPGPATH
WalWriteMethod * walmethod
Definition: receivelog.h:46
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
#define snprintf
Definition: port.h:216

◆ open_walfile()

static bool open_walfile ( StreamCtl stream,
XLogRecPtr  startpoint 
)
static

Definition at line 88 of file receivelog.c.

References WalWriteMethod::close, CLOSE_UNLINK, WalWriteMethod::compression, current_walfile_name, WalWriteMethod::existsfile, fn(), WalWriteMethod::get_file_name, WalWriteMethod::get_file_size, WalWriteMethod::getlasterror, ngettext, WalWriteMethod::open_for_write, StreamCtl::partial_suffix, pg_free(), pg_log_error, pg_log_fatal, WalWriteMethod::sync, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, XLByteToSeg, and XLogFileName.

Referenced by ProcessXLogDataMsg().

89 {
90  Walfile *f;
91  char *fn;
92  ssize_t size;
93  XLogSegNo segno;
94 
95  XLByteToSeg(startpoint, segno, WalSegSz);
97 
98  /* Note that this considers the compression used if necessary */
100  stream->partial_suffix);
101 
102  /*
103  * When streaming to files, if an existing file exists we verify that it's
104  * either empty (just created), or a complete WalSegSz segment (in which
105  * case it has been created and padded). Anything else indicates a corrupt
106  * file. Compressed files have no need for padding, so just ignore this
107  * case.
108  *
109  * When streaming to tar, no file with this name will exist before, so we
110  * never have to verify a size.
111  */
112  if (stream->walmethod->compression() == 0 &&
113  stream->walmethod->existsfile(fn))
114  {
115  size = stream->walmethod->get_file_size(fn);
116  if (size < 0)
117  {
118  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
119  fn, stream->walmethod->getlasterror());
120  pg_free(fn);
121  return false;
122  }
123  if (size == WalSegSz)
124  {
125  /* Already padded file. Open it for use */
127  if (f == NULL)
128  {
129  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
130  fn, stream->walmethod->getlasterror());
131  pg_free(fn);
132  return false;
133  }
134 
135  /* fsync file in case of a previous crash */
136  if (stream->walmethod->sync(f) != 0)
137  {
138  pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
139  fn, stream->walmethod->getlasterror());
140  stream->walmethod->close(f, CLOSE_UNLINK);
141  exit(1);
142  }
143 
144  walfile = f;
145  pg_free(fn);
146  return true;
147  }
148  if (size != 0)
149  {
150  /* if write didn't set errno, assume problem is no disk space */
151  if (errno == 0)
152  errno = ENOSPC;
153  pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
154  "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
155  size),
156  fn, (int) size, WalSegSz);
157  pg_free(fn);
158  return false;
159  }
160  /* File existed and was empty, so fall through and open */
161  }
162 
163  /* No file existed, so create one */
164 
166  stream->partial_suffix, WalSegSz);
167  if (f == NULL)
168  {
169  pg_log_error("could not open write-ahead log file \"%s\": %s",
170  fn, stream->walmethod->getlasterror());
171  pg_free(fn);
172  return false;
173  }
174 
175  pg_free(fn);
176  walfile = f;
177  return true;
178 }
char *(* get_file_name)(const char *pathname, const char *temp_suffix)
Definition: walmethods.h:59
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
#define pg_log_error(...)
Definition: logging.h:80
char * partial_suffix
Definition: receivelog.h:47
int(* sync)(Walfile f)
Definition: walmethods.h:76
TimeLineID timeline
Definition: receivelog.h:32
const char *(* getlasterror)(void)
Definition: walmethods.h:87
static Walfile * walfile
Definition: receivelog.c:31
uint64 XLogSegNo
Definition: xlogdefs.h:48
WalWriteMethod * walmethod
Definition: receivelog.h:46
#define ngettext(s, p, n)
Definition: c.h:1182
static void * fn(void *arg)
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
void pg_free(void *ptr)
Definition: fe_memutils.c:105
uint32 WalSegSz
Definition: streamutil.c:34
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
int(* compression)(void)
Definition: walmethods.h:62
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define pg_log_fatal(...)
Definition: logging.h:76

◆ ProcessKeepaliveMsg()

static bool ProcessKeepaliveMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr  blockpos,
TimestampTz last_status 
)
static

Definition at line 986 of file receivelog.c.

References current_walfile_name, feGetCurrentTimestamp(), WalWriteMethod::getlasterror, lastFlushPosition, now(), pg_log_error, pg_log_fatal, reportFlushPosition, sendFeedback(), still_sending, WalWriteMethod::sync, walfile, and StreamCtl::walmethod.

Referenced by HandleCopyStream().

988 {
989  int pos;
990  bool replyRequested;
992 
993  /*
994  * Parse the keepalive message, enclosed in the CopyData message. We just
995  * check if the server requested a reply, and ignore the rest.
996  */
997  pos = 1; /* skip msgtype 'k' */
998  pos += 8; /* skip walEnd */
999  pos += 8; /* skip sendTime */
1000 
1001  if (len < pos + 1)
1002  {
1003  pg_log_error("streaming header too small: %d", len);
1004  return false;
1005  }
1006  replyRequested = copybuf[pos];
1007 
1008  /* If the server requested an immediate reply, send one. */
1009  if (replyRequested && still_sending)
1010  {
1011  if (reportFlushPosition && lastFlushPosition < blockpos &&
1012  walfile != NULL)
1013  {
1014  /*
1015  * If a valid flush location needs to be reported, flush the
1016  * current WAL file so that the latest flush location is sent back
1017  * to the server. This is necessary to see whether the last WAL
1018  * data has been successfully replicated or not, at the normal
1019  * shutdown of the server.
1020  */
1021  if (stream->walmethod->sync(walfile) != 0)
1022  {
1023  pg_log_fatal("could not fsync file \"%s\": %s",
1025  exit(1);
1026  }
1027  lastFlushPosition = blockpos;
1028  }
1029 
1030  now = feGetCurrentTimestamp();
1031  if (!sendFeedback(conn, blockpos, now, false))
1032  return false;
1033  *last_status = now;
1034  }
1035 
1036  return true;
1037 }
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:612
static bool still_sending
Definition: receivelog.c:36
int64 TimestampTz
Definition: timestamp.h:39
#define pg_log_error(...)
Definition: logging.h:80
int(* sync)(Walfile f)
Definition: walmethods.h:76
const char *(* getlasterror)(void)
Definition: walmethods.h:87
static Walfile * walfile
Definition: receivelog.c:31
static bool reportFlushPosition
Definition: receivelog.c:33
WalWriteMethod * walmethod
Definition: receivelog.h:46
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:328
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
StringInfo copybuf
Definition: tablesync.c:124
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define pg_log_fatal(...)
Definition: logging.h:76

◆ ProcessXLogDataMsg()

static bool ProcessXLogDataMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr blockpos 
)
static

Definition at line 1043 of file receivelog.c.

References close_walfile(), current_walfile_name, fe_recvint64(), WalWriteMethod::get_current_pos, WalWriteMethod::getlasterror, open_walfile(), pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, WalWriteMethod::write, and XLogSegmentOffset.

Referenced by HandleCopyStream().

1045 {
1046  int xlogoff;
1047  int bytes_left;
1048  int bytes_written;
1049  int hdr_len;
1050 
1051  /*
1052  * Once we've decided we don't want to receive any more, just ignore any
1053  * subsequent XLogData messages.
1054  */
1055  if (!(still_sending))
1056  return true;
1057 
1058  /*
1059  * Read the header of the XLogData message, enclosed in the CopyData
1060  * message. We only need the WAL location field (dataStart), the rest of
1061  * the header is ignored.
1062  */
1063  hdr_len = 1; /* msgtype 'w' */
1064  hdr_len += 8; /* dataStart */
1065  hdr_len += 8; /* walEnd */
1066  hdr_len += 8; /* sendTime */
1067  if (len < hdr_len)
1068  {
1069  pg_log_error("streaming header too small: %d", len);
1070  return false;
1071  }
1072  *blockpos = fe_recvint64(&copybuf[1]);
1073 
1074  /* Extract WAL location for this block */
1075  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1076 
1077  /*
1078  * Verify that the initial location in the stream matches where we think
1079  * we are.
1080  */
1081  if (walfile == NULL)
1082  {
1083  /* No file open yet */
1084  if (xlogoff != 0)
1085  {
1086  pg_log_error("received write-ahead log record for offset %u with no file open",
1087  xlogoff);
1088  return false;
1089  }
1090  }
1091  else
1092  {
1093  /* More data in existing segment */
1094  if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1095  {
1096  pg_log_error("got WAL data offset %08x, expected %08x",
1097  xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1098  return false;
1099  }
1100  }
1101 
1102  bytes_left = len - hdr_len;
1103  bytes_written = 0;
1104 
1105  while (bytes_left)
1106  {
1107  int bytes_to_write;
1108 
1109  /*
1110  * If crossing a WAL boundary, only write up until we reach wal
1111  * segment size.
1112  */
1113  if (xlogoff + bytes_left > WalSegSz)
1114  bytes_to_write = WalSegSz - xlogoff;
1115  else
1116  bytes_to_write = bytes_left;
1117 
1118  if (walfile == NULL)
1119  {
1120  if (!open_walfile(stream, *blockpos))
1121  {
1122  /* Error logged by open_walfile */
1123  return false;
1124  }
1125  }
1126 
1127  if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1128  bytes_to_write) != bytes_to_write)
1129  {
1130  pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1131  bytes_to_write, current_walfile_name,
1132  stream->walmethod->getlasterror());
1133  return false;
1134  }
1135 
1136  /* Write was successful, advance our position */
1137  bytes_written += bytes_to_write;
1138  bytes_left -= bytes_to_write;
1139  *blockpos += bytes_to_write;
1140  xlogoff += bytes_to_write;
1141 
1142  /* Did we reach the end of a WAL segment? */
1143  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1144  {
1145  if (!close_walfile(stream, *blockpos))
1146  /* Error message written in close_walfile() */
1147  return false;
1148 
1149  xlogoff = 0;
1150 
1151  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1152  {
1153  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1154  {
1155  pg_log_error("could not send copy-end packet: %s",
1156  PQerrorMessage(conn));
1157  return false;
1158  }
1159  still_sending = false;
1160  return true; /* ignore the rest of this XLogData packet */
1161  }
1162  }
1163  }
1164  /* No more data left to write, receive next copy packet */
1165 
1166  return true;
1167 }
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:88
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
static bool still_sending
Definition: receivelog.c:36
#define pg_log_error(...)
Definition: logging.h:80
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2600
TimeLineID timeline
Definition: receivelog.h:32
const char *(* getlasterror)(void)
Definition: walmethods.h:87
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:71
static Walfile * walfile
Definition: receivelog.c:31
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:68
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:186
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
StringInfo copybuf
Definition: tablesync.c:124
uint32 WalSegSz
Definition: streamutil.c:34
int64 fe_recvint64(char *buf)
Definition: streamutil.c:677
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32

◆ ReadEndOfStreamingResult()

static bool ReadEndOfStreamingResult ( PGresult res,
XLogRecPtr startpos,
uint32 timeline 
)
static

Definition at line 694 of file receivelog.c.

References pg_log_error, PQgetvalue(), PQnfields(), and PQntuples().

Referenced by ReceiveXlogStream().

695 {
696  uint32 startpos_xlogid,
697  startpos_xrecoff;
698 
699  /*----------
700  * The result set consists of one row and two columns, e.g:
701  *
702  * next_tli | next_tli_startpos
703  * ----------+-------------------
704  * 4 | 0/9949AE0
705  *
706  * next_tli is the timeline ID of the next timeline after the one that
707  * just finished streaming. next_tli_startpos is the WAL location where
708  * the server switched to it.
709  *----------
710  */
711  if (PQnfields(res) < 2 || PQntuples(res) != 1)
712  {
713  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
714  PQntuples(res), PQnfields(res), 1, 2);
715  return false;
716  }
717 
718  *timeline = atoi(PQgetvalue(res, 0, 0));
719  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
720  &startpos_xrecoff) != 2)
721  {
722  pg_log_error("could not parse next timeline's starting point \"%s\"",
723  PQgetvalue(res, 0, 1));
724  return false;
725  }
726  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
727 
728  return true;
729 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3256
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
#define pg_log_error(...)
Definition: logging.h:80
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
unsigned int uint32
Definition: c.h:441
static XLogRecPtr startpos

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 444 of file receivelog.c.

References CheckServerVersionForStreaming(), WalWriteMethod::close, CLOSE_NO_RENAME, current_walfile_name, error(), existsTimeLineHistoryFile(), WalWriteMethod::getlasterror, HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

445 {
446  char query[128];
447  char slotcmd[128];
448  PGresult *res;
449  XLogRecPtr stoppos;
450 
451  /*
452  * The caller should've checked the server version already, but doesn't do
453  * any harm to check it here too.
454  */
456  return false;
457 
458  /*
459  * Decide whether we want to report the flush position. If we report the
460  * flush position, the primary will know what WAL we'll possibly
461  * re-request, and it can then remove older WAL safely. We must always do
462  * that when we are using slots.
463  *
464  * Reporting the flush position makes one eligible as a synchronous
465  * replica. People shouldn't include generic names in
466  * synchronous_standby_names, but we've protected them against it so far,
467  * so let's continue to do so unless specifically requested.
468  */
469  if (stream->replication_slot != NULL)
470  {
471  reportFlushPosition = true;
472  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
473  }
474  else
475  {
476  if (stream->synchronous)
477  reportFlushPosition = true;
478  else
479  reportFlushPosition = false;
480  slotcmd[0] = 0;
481  }
482 
483  if (stream->sysidentifier != NULL)
484  {
485  /* Validate system identifier hasn't changed */
486  res = PQexec(conn, "IDENTIFY_SYSTEM");
487  if (PQresultStatus(res) != PGRES_TUPLES_OK)
488  {
489  pg_log_error("could not send replication command \"%s\": %s",
490  "IDENTIFY_SYSTEM", PQerrorMessage(conn));
491  PQclear(res);
492  return false;
493  }
494  if (PQntuples(res) != 1 || PQnfields(res) < 3)
495  {
496  pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
497  PQntuples(res), PQnfields(res), 1, 3);
498  PQclear(res);
499  return false;
500  }
501  if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
502  {
503  pg_log_error("system identifier does not match between base backup and streaming connection");
504  PQclear(res);
505  return false;
506  }
507  if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
508  {
509  pg_log_error("starting timeline %u is not present in the server",
510  stream->timeline);
511  PQclear(res);
512  return false;
513  }
514  PQclear(res);
515  }
516 
517  /*
518  * initialize flush position to starting point, it's the caller's
519  * responsibility that that's sane.
520  */
521  lastFlushPosition = stream->startpos;
522 
523  while (1)
524  {
525  /*
526  * Fetch the timeline history file for this timeline, if we don't have
527  * it already. When streaming log to tar, this will always return
528  * false, as we are never streaming into an existing file and
529  * therefore there can be no pre-existing timeline history file.
530  */
531  if (!existsTimeLineHistoryFile(stream))
532  {
533  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
534  res = PQexec(conn, query);
535  if (PQresultStatus(res) != PGRES_TUPLES_OK)
536  {
537  /* FIXME: we might send it ok, but get an error */
538  pg_log_error("could not send replication command \"%s\": %s",
539  "TIMELINE_HISTORY", PQresultErrorMessage(res));
540  PQclear(res);
541  return false;
542  }
543 
544  /*
545  * The response to TIMELINE_HISTORY is a single row result set
546  * with two fields: filename and content
547  */
548  if (PQnfields(res) != 2 || PQntuples(res) != 1)
549  {
550  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
551  PQntuples(res), PQnfields(res), 1, 2);
552  }
553 
554  /* Write the history file to disk */
556  PQgetvalue(res, 0, 0),
557  PQgetvalue(res, 0, 1));
558 
559  PQclear(res);
560  }
561 
562  /*
563  * Before we start streaming from the requested location, check if the
564  * callback tells us to stop here.
565  */
566  if (stream->stream_stop(stream->startpos, stream->timeline, false))
567  return true;
568 
569  /* Initiate the replication stream at specified location */
570  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
571  slotcmd,
572  LSN_FORMAT_ARGS(stream->startpos),
573  stream->timeline);
574  res = PQexec(conn, query);
575  if (PQresultStatus(res) != PGRES_COPY_BOTH)
576  {
577  pg_log_error("could not send replication command \"%s\": %s",
578  "START_REPLICATION", PQresultErrorMessage(res));
579  PQclear(res);
580  return false;
581  }
582  PQclear(res);
583 
584  /* Stream the WAL */
585  res = HandleCopyStream(conn, stream, &stoppos);
586  if (res == NULL)
587  goto error;
588 
589  /*
590  * Streaming finished.
591  *
592  * There are two possible reasons for that: a controlled shutdown, or
593  * we reached the end of the current timeline. In case of
594  * end-of-timeline, the server sends a result set after Copy has
595  * finished, containing information about the next timeline. Read
596  * that, and restart streaming from the next timeline. In case of
597  * controlled shutdown, stop here.
598  */
599  if (PQresultStatus(res) == PGRES_TUPLES_OK)
600  {
601  /*
602  * End-of-timeline. Read the next timeline's ID and starting
603  * position. Usually, the starting position will match the end of
604  * the previous timeline, but there are corner cases like if the
605  * server had sent us half of a WAL record, when it was promoted.
606  * The new timeline will begin at the end of the last complete
607  * record in that case, overlapping the partial WAL record on the
608  * old timeline.
609  */
610  uint32 newtimeline;
611  bool parsed;
612 
613  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
614  PQclear(res);
615  if (!parsed)
616  goto error;
617 
618  /* Sanity check the values the server gave us */
619  if (newtimeline <= stream->timeline)
620  {
621  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
622  newtimeline, stream->timeline);
623  goto error;
624  }
625  if (stream->startpos > stoppos)
626  {
627  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
628  stream->timeline, LSN_FORMAT_ARGS(stoppos),
629  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
630  goto error;
631  }
632 
633  /* Read the final result, which should be CommandComplete. */
634  res = PQgetResult(conn);
635  if (PQresultStatus(res) != PGRES_COMMAND_OK)
636  {
637  pg_log_error("unexpected termination of replication stream: %s",
638  PQresultErrorMessage(res));
639  PQclear(res);
640  goto error;
641  }
642  PQclear(res);
643 
644  /*
645  * Loop back to start streaming from the new timeline. Always
646  * start streaming at the beginning of a segment.
647  */
648  stream->timeline = newtimeline;
649  stream->startpos = stream->startpos -
651  continue;
652  }
653  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
654  {
655  PQclear(res);
656 
657  /*
658  * End of replication (ie. controlled shut down of the server).
659  *
660  * Check if the callback thinks it's OK to stop here. If not,
661  * complain.
662  */
663  if (stream->stream_stop(stoppos, stream->timeline, false))
664  return true;
665  else
666  {
667  pg_log_error("replication stream was terminated before stop point");
668  goto error;
669  }
670  }
671  else
672  {
673  /* Server returned an error. */
674  pg_log_error("unexpected termination of replication stream: %s",
675  PQresultErrorMessage(res));
676  PQclear(res);
677  goto error;
678  }
679  }
680 
681 error:
682  if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
683  pg_log_error("could not close file \"%s\": %s",
685  walfile = NULL;
686  return false;
687 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3256
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
static void error(void)
Definition: sql-dyntest.c:147
#define pg_log_error(...)
Definition: logging.h:80
char * sysidentifier
Definition: receivelog.h:33
XLogRecPtr startpos
Definition: receivelog.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
TimeLineID timeline
Definition: receivelog.h:32
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
const char *(* getlasterror)(void)
Definition: walmethods.h:87
#define sprintf
Definition: port.h:218
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:694
static Walfile * walfile
Definition: receivelog.c:31
char * replication_slot
Definition: receivelog.h:48
static bool reportFlushPosition
Definition: receivelog.c:33
unsigned int uint32
Definition: c.h:441
stream_stop_callback stream_stop
Definition: receivelog.h:41
WalWriteMethod * walmethod
Definition: receivelog.h:46
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
void PQclear(PGresult *res)
Definition: fe-exec.c:694
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:740
uint64 XLogRecPtr
Definition: xlogdefs.h:21
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:250
bool synchronous
Definition: receivelog.h:36
uint32 WalSegSz
Definition: streamutil.c:34
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3194
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
#define pg_log_warning(...)
Definition: pgfnames.c:24
static char current_walfile_name[MAXPGPATH]
Definition: receivelog.c:32
#define snprintf
Definition: port.h:216
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:267
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:366

◆ sendFeedback()

static bool sendFeedback ( PGconn conn,
XLogRecPtr  blockpos,
TimestampTz  now,
bool  replyRequested 
)
static

Definition at line 328 of file receivelog.c.

References fe_sendint64(), InvalidXLogRecPtr, lastFlushPosition, pg_log_error, PQerrorMessage(), PQflush(), PQputCopyData(), and reportFlushPosition.

Referenced by HandleCopyStream(), and ProcessKeepaliveMsg().

329 {
330  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
331  int len = 0;
332 
333  replybuf[len] = 'r';
334  len += 1;
335  fe_sendint64(blockpos, &replybuf[len]); /* write */
336  len += 8;
338  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
339  else
340  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
341  len += 8;
342  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
343  len += 8;
344  fe_sendint64(now, &replybuf[len]); /* sendTime */
345  len += 8;
346  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
347  len += 1;
348 
349  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
350  {
351  pg_log_error("could not send feedback packet: %s",
352  PQerrorMessage(conn));
353  return false;
354  }
355 
356  return true;
357 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2544
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6737
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define pg_log_error(...)
Definition: logging.h:80
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
static bool reportFlushPosition
Definition: receivelog.c:33
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:34
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:666
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544

◆ writeTimeLineHistoryFile()

static bool writeTimeLineHistoryFile ( StreamCtl stream,
char *  filename,
char *  content 
)
static

Definition at line 267 of file receivelog.c.

References WalWriteMethod::close, CLOSE_NORMAL, CLOSE_UNLINK, WalWriteMethod::getlasterror, StreamCtl::mark_done, mark_file_as_archived(), MAXFNAMELEN, WalWriteMethod::open_for_write, pg_log_error, StreamCtl::timeline, TLHistoryFileName, StreamCtl::walmethod, and WalWriteMethod::write.

Referenced by ReceiveXlogStream().

268 {
269  int size = strlen(content);
270  char histfname[MAXFNAMELEN];
271  Walfile *f;
272 
273  /*
274  * Check that the server's idea of how timeline history files should be
275  * named matches ours.
276  */
277  TLHistoryFileName(histfname, stream->timeline);
278  if (strcmp(histfname, filename) != 0)
279  {
280  pg_log_error("server reported unexpected history file name for timeline %u: %s",
281  stream->timeline, filename);
282  return false;
283  }
284 
285  f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
286  if (f == NULL)
287  {
288  pg_log_error("could not create timeline history file \"%s\": %s",
289  histfname, stream->walmethod->getlasterror());
290  return false;
291  }
292 
293  if ((int) stream->walmethod->write(f, content, size) != size)
294  {
295  pg_log_error("could not write timeline history file \"%s\": %s",
296  histfname, stream->walmethod->getlasterror());
297 
298  /*
299  * If we fail to make the file, delete it to release disk space
300  */
301  stream->walmethod->close(f, CLOSE_UNLINK);
302 
303  return false;
304  }
305 
306  if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
307  {
308  pg_log_error("could not close file \"%s\": %s",
309  histfname, stream->walmethod->getlasterror());
310  return false;
311  }
312 
313  /* Maintain archive_status, check close_walfile() for details. */
314  if (stream->mark_done)
315  {
316  /* writes error message if failed */
317  if (!mark_file_as_archived(stream, histfname))
318  return false;
319  }
320 
321  return true;
322 }
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
#define pg_log_error(...)
Definition: logging.h:80
TimeLineID timeline
Definition: receivelog.h:32
const char *(* getlasterror)(void)
Definition: walmethods.h:87
bool mark_done
Definition: receivelog.h:37
#define TLHistoryFileName(fname, tli)
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:68
WalWriteMethod * walmethod
Definition: receivelog.h:46
#define MAXFNAMELEN
static char * filename
Definition: pg_dumpall.c:92
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:57

Variable Documentation

◆ current_walfile_name

char current_walfile_name[MAXPGPATH] = ""
static

◆ lastFlushPosition

XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
static

◆ reportFlushPosition

bool reportFlushPosition = false
static

Definition at line 33 of file receivelog.c.

Referenced by ProcessKeepaliveMsg(), ReceiveXlogStream(), and sendFeedback().

◆ still_sending

◆ walfile