PostgreSQL Source Code  git master
receivelog.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/logging.h"
#include "libpq-fe.h"
#include "receivelog.h"
#include "streamutil.h"
Include dependency graph for receivelog.c:

Go to the source code of this file.

Functions

static PGresultHandleCopyStream (PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
 
static int CopyStreamPoll (PGconn *conn, long timeout_ms, pgsocket stop_socket)
 
static int CopyStreamReceive (PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
 
static bool ProcessKeepaliveMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
 
static bool ProcessXLogDataMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
 
static PGresultHandleEndOfCopyStream (PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
 
static bool CheckCopyStreamStop (PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
 
static long CalculateCopyStreamSleeptime (TimestampTz now, int standby_message_timeout, TimestampTz last_status)
 
static bool ReadEndOfStreamingResult (PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
 
static bool mark_file_as_archived (StreamCtl *stream, const char *fname)
 
static bool open_walfile (StreamCtl *stream, XLogRecPtr startpoint)
 
static bool close_walfile (StreamCtl *stream, XLogRecPtr pos)
 
static bool existsTimeLineHistoryFile (StreamCtl *stream)
 
static bool writeTimeLineHistoryFile (StreamCtl *stream, char *filename, char *content)
 
static bool sendFeedback (PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
 
bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Variables

static Walfilewalfile = NULL
 
static bool reportFlushPosition = false
 
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
 
static bool still_sending = true
 

Function Documentation

◆ CalculateCopyStreamSleeptime()

static long CalculateCopyStreamSleeptime ( TimestampTz  now,
int  standby_message_timeout,
TimestampTz  last_status 
)
static

Definition at line 1235 of file receivelog.c.

1237 {
1238  TimestampTz status_targettime = 0;
1239  long sleeptime;
1240 
1242  status_targettime = last_status +
1243  (standby_message_timeout - 1) * ((int64) 1000);
1244 
1245  if (status_targettime > 0)
1246  {
1247  long secs;
1248  int usecs;
1249 
1251  status_targettime,
1252  &secs,
1253  &usecs);
1254  /* Always sleep at least 1 sec */
1255  if (secs <= 0)
1256  {
1257  secs = 1;
1258  usecs = 0;
1259  }
1260 
1261  sleeptime = secs * 1000 + usecs / 1000;
1262  }
1263  else
1264  sleeptime = -1;
1265 
1266  return sleeptime;
1267 }
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
int64 TimestampTz
Definition: timestamp.h:39
static int standby_message_timeout
static bool still_sending
Definition: receivelog.c:32
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:886

References feTimestampDifference(), now(), standby_message_timeout, and still_sending.

Referenced by HandleCopyStream().

◆ CheckCopyStreamStop()

static bool CheckCopyStreamStop ( PGconn conn,
StreamCtl stream,
XLogRecPtr  blockpos 
)
static

Definition at line 1210 of file receivelog.c.

1211 {
1212  if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1213  {
1214  if (!close_walfile(stream, blockpos))
1215  {
1216  /* Potential error message is written by close_walfile */
1217  return false;
1218  }
1219  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220  {
1221  pg_log_error("could not send copy-end packet: %s",
1222  PQerrorMessage(conn));
1223  return false;
1224  }
1225  still_sending = false;
1226  }
1227 
1228  return true;
1229 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7212
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
#define pg_log_error(...)
Definition: logging.h:106
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:191
PGconn * conn
Definition: streamutil.c:53
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41

References close_walfile(), conn, pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, and StreamCtl::timeline.

Referenced by HandleCopyStream().

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 374 of file receivelog.c.

375 {
376  int minServerMajor,
377  maxServerMajor;
378  int serverMajor;
379 
380  /*
381  * The message format used in streaming replication changed in 9.3, so we
382  * cannot stream from older servers. And we don't support servers newer
383  * than the client; it might work, but we don't know, so err on the safe
384  * side.
385  */
386  minServerMajor = 903;
387  maxServerMajor = PG_VERSION_NUM / 100;
388  serverMajor = PQserverVersion(conn) / 100;
389  if (serverMajor < minServerMajor)
390  {
391  const char *serverver = PQparameterStatus(conn, "server_version");
392 
393  pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394  serverver ? serverver : "'unknown'",
395  "9.3");
396  return false;
397  }
398  else if (serverMajor > maxServerMajor)
399  {
400  const char *serverver = PQparameterStatus(conn, "server_version");
401 
402  pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403  serverver ? serverver : "'unknown'",
404  PG_VERSION);
405  return false;
406  }
407  return true;
408 }
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7167
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7202

References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

◆ close_walfile()

static bool close_walfile ( StreamCtl stream,
XLogRecPtr  pos 
)
static

Definition at line 191 of file receivelog.c.

192 {
193  char *fn;
194  off_t currpos;
195  int r;
196  char walfile_name[MAXPGPATH];
197 
198  if (walfile == NULL)
199  return true;
200 
201  strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
202  currpos = walfile->currpos;
203 
204  /* Note that this considers the compression used if necessary */
205  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
206  walfile_name,
207  stream->partial_suffix);
208 
209  if (stream->partial_suffix)
210  {
211  if (currpos == WalSegSz)
212  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
213  else
214  {
215  pg_log_info("not renaming \"%s\", segment is not complete", fn);
216  r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
217  }
218  }
219  else
220  r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
221 
222  walfile = NULL;
223 
224  if (r != 0)
225  {
226  pg_log_error("could not close file \"%s\": %s",
228 
229  pg_free(fn);
230  return false;
231  }
232 
233  pg_free(fn);
234 
235  /*
236  * Mark file as archived if requested by the caller - pg_basebackup needs
237  * to do so as files can otherwise get archived again after promotion of a
238  * new node. This is in line with walreceiver.c always doing a
239  * XLogArchiveForceDone() after a complete segment.
240  */
241  if (currpos == WalSegSz && stream->mark_done)
242  {
243  /* writes error message if failed */
244  if (!mark_file_as_archived(stream, walfile_name))
245  return false;
246  }
247 
248  lastFlushPosition = pos;
249  return true;
250 }
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_log_info(...)
Definition: logging.h:124
#define MAXPGPATH
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:53
static Walfile * walfile
Definition: receivelog.c:28
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:30
int WalSegSz
Definition: streamutil.c:32
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
const WalWriteMethodOps * ops
Definition: walmethods.h:105
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
static void * fn(void *arg)
Definition: thread-alloc.c:119
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
@ CLOSE_NORMAL
Definition: walmethods.h:33

References WalWriteMethodOps::close, CLOSE_NO_RENAME, CLOSE_NORMAL, Walfile::currpos, fn(), WalWriteMethodOps::get_file_name, GetLastWalMethodError(), lastFlushPosition, StreamCtl::mark_done, mark_file_as_archived(), MAXPGPATH, WalWriteMethod::ops, StreamCtl::partial_suffix, Walfile::pathname, pg_free(), pg_log_error, pg_log_info, strlcpy(), walfile, StreamCtl::walmethod, and WalSegSz.

Referenced by CheckCopyStreamStop(), HandleEndOfCopyStream(), and ProcessXLogDataMsg().

◆ CopyStreamPoll()

static int CopyStreamPoll ( PGconn conn,
long  timeout_ms,
pgsocket  stop_socket 
)
static

Definition at line 869 of file receivelog.c.

870 {
871  int ret;
872  fd_set input_mask;
873  int connsocket;
874  int maxfd;
875  struct timeval timeout;
876  struct timeval *timeoutptr;
877 
878  connsocket = PQsocket(conn);
879  if (connsocket < 0)
880  {
881  pg_log_error("invalid socket: %s", PQerrorMessage(conn));
882  return -1;
883  }
884 
885  FD_ZERO(&input_mask);
886  FD_SET(connsocket, &input_mask);
887  maxfd = connsocket;
888  if (stop_socket != PGINVALID_SOCKET)
889  {
890  FD_SET(stop_socket, &input_mask);
891  maxfd = Max(maxfd, stop_socket);
892  }
893 
894  if (timeout_ms < 0)
895  timeoutptr = NULL;
896  else
897  {
898  timeout.tv_sec = timeout_ms / 1000L;
899  timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900  timeoutptr = &timeout;
901  }
902 
903  ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
904 
905  if (ret < 0)
906  {
907  if (errno == EINTR)
908  return 0; /* Got a signal, so not an error */
909  pg_log_error("%s() failed: %m", "select");
910  return -1;
911  }
912  if (ret > 0 && FD_ISSET(connsocket, &input_mask))
913  return 1; /* Got input on connection socket */
914 
915  return 0; /* Got timeout or input on stop_socket */
916 }
#define Max(x, y)
Definition: c.h:989
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7238
#define PGINVALID_SOCKET
Definition: port.h:31
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513

References conn, EINTR, Max, pg_log_error, PGINVALID_SOCKET, PQerrorMessage(), PQsocket(), and select.

Referenced by CopyStreamReceive().

◆ CopyStreamReceive()

static int CopyStreamReceive ( PGconn conn,
long  timeout,
pgsocket  stop_socket,
char **  buffer 
)
static

Definition at line 931 of file receivelog.c.

933 {
934  char *copybuf = NULL;
935  int rawlen;
936 
937  PQfreemem(*buffer);
938  *buffer = NULL;
939 
940  /* Try to receive a CopyData message */
941  rawlen = PQgetCopyData(conn, &copybuf, 1);
942  if (rawlen == 0)
943  {
944  int ret;
945 
946  /*
947  * No data available. Wait for some to appear, but not longer than
948  * the specified timeout, so that we can ping the server. Also stop
949  * waiting if input appears on stop_socket.
950  */
951  ret = CopyStreamPoll(conn, timeout, stop_socket);
952  if (ret <= 0)
953  return ret;
954 
955  /* Now there is actually data on the socket */
956  if (PQconsumeInput(conn) == 0)
957  {
958  pg_log_error("could not receive data from WAL stream: %s",
960  return -1;
961  }
962 
963  /* Now that we've consumed some input, try again */
964  rawlen = PQgetCopyData(conn, &copybuf, 1);
965  if (rawlen == 0)
966  return 0;
967  }
968  if (rawlen == -1) /* end-of-streaming or error */
969  return -2;
970  if (rawlen == -2)
971  {
972  pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
973  return -1;
974  }
975 
976  /* Return received messages to caller */
977  *buffer = copybuf;
978  return rawlen;
979 }
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:869
static StringInfo copybuf
Definition: tablesync.c:137

References conn, copybuf, CopyStreamPoll(), pg_log_error, PQconsumeInput(), PQerrorMessage(), PQfreemem(), and PQgetCopyData().

Referenced by HandleCopyStream().

◆ existsTimeLineHistoryFile()

static bool existsTimeLineHistoryFile ( StreamCtl stream)
static

Definition at line 257 of file receivelog.c.

258 {
259  char histfname[MAXFNAMELEN];
260 
261  /*
262  * Timeline 1 never has a history file. We treat that as if it existed,
263  * since we never need to stream it.
264  */
265  if (stream->timeline == 1)
266  return true;
267 
268  TLHistoryFileName(histfname, stream->timeline);
269 
270  return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
271 }
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:58
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)

References WalWriteMethodOps::existsfile, MAXFNAMELEN, WalWriteMethod::ops, StreamCtl::timeline, TLHistoryFileName(), and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

◆ HandleCopyStream()

static PGresult * HandleCopyStream ( PGconn conn,
StreamCtl stream,
XLogRecPtr stoppos 
)
static

Definition at line 744 of file receivelog.c.

746 {
747  char *copybuf = NULL;
748  TimestampTz last_status = -1;
749  XLogRecPtr blockpos = stream->startpos;
750 
751  still_sending = true;
752 
753  while (1)
754  {
755  int r;
757  long sleeptime;
758 
759  /*
760  * Check if we should continue streaming, or abort at this point.
761  */
762  if (!CheckCopyStreamStop(conn, stream, blockpos))
763  goto error;
764 
766 
767  /*
768  * If synchronous option is true, issue sync command as soon as there
769  * are WAL data which has not been flushed yet.
770  */
771  if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
772  {
773  if (stream->walmethod->ops->sync(walfile) != 0)
774  pg_fatal("could not fsync file \"%s\": %s",
776  lastFlushPosition = blockpos;
777 
778  /*
779  * Send feedback so that the server sees the latest WAL locations
780  * immediately.
781  */
782  if (!sendFeedback(conn, blockpos, now, false))
783  goto error;
784  last_status = now;
785  }
786 
787  /*
788  * Potentially send a status message to the primary
789  */
790  if (still_sending && stream->standby_message_timeout > 0 &&
791  feTimestampDifferenceExceeds(last_status, now,
792  stream->standby_message_timeout))
793  {
794  /* Time to send feedback! */
795  if (!sendFeedback(conn, blockpos, now, false))
796  goto error;
797  last_status = now;
798  }
799 
800  /*
801  * Calculate how long send/receive loops should sleep
802  */
804  last_status);
805 
806  r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
807  while (r != 0)
808  {
809  if (r == -1)
810  goto error;
811  if (r == -2)
812  {
813  PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814 
815  if (res == NULL)
816  goto error;
817  else
818  return res;
819  }
820 
821  /* Check the message type. */
822  if (copybuf[0] == 'k')
823  {
824  if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825  &last_status))
826  goto error;
827  }
828  else if (copybuf[0] == 'w')
829  {
830  if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831  goto error;
832 
833  /*
834  * Check if we should continue streaming, or abort at this
835  * point.
836  */
837  if (!CheckCopyStreamStop(conn, stream, blockpos))
838  goto error;
839  }
840  else
841  {
842  pg_log_error("unrecognized streaming header: \"%c\"",
843  copybuf[0]);
844  goto error;
845  }
846 
847  /*
848  * Process the received data, and any subsequent data we can read
849  * without blocking.
850  */
851  r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
852  }
853  }
854 
855 error:
857  return NULL;
858 }
#define pg_fatal(...)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:931
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1210
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:985
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1039
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1170
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1235
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:336
static void error(void)
Definition: sql-dyntest.c:147
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:867
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:908
XLogRecPtr startpos
Definition: receivelog.h:31
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
bool synchronous
Definition: receivelog.h:36
int(* sync)(Walfile *f)
Definition: walmethods.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CalculateCopyStreamSleeptime(), CheckCopyStreamStop(), conn, copybuf, CopyStreamReceive(), error(), feGetCurrentTimestamp(), feTimestampDifferenceExceeds(), GetLastWalMethodError(), HandleEndOfCopyStream(), lastFlushPosition, now(), WalWriteMethod::ops, Walfile::pathname, pg_fatal, pg_log_error, PQfreemem(), ProcessKeepaliveMsg(), ProcessXLogDataMsg(), res, sendFeedback(), StreamCtl::standby_message_timeout, StreamCtl::startpos, still_sending, StreamCtl::stop_socket, WalWriteMethodOps::sync, StreamCtl::synchronous, walfile, and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

◆ HandleEndOfCopyStream()

static PGresult * HandleEndOfCopyStream ( PGconn conn,
StreamCtl stream,
char *  copybuf,
XLogRecPtr  blockpos,
XLogRecPtr stoppos 
)
static

Definition at line 1170 of file receivelog.c.

1172 {
1174 
1175  /*
1176  * The server closed its end of the copy stream. If we haven't closed
1177  * ours already, we need to do so now, unless the server threw an error,
1178  * in which case we don't.
1179  */
1180  if (still_sending)
1181  {
1182  if (!close_walfile(stream, blockpos))
1183  {
1184  /* Error message written in close_walfile() */
1185  PQclear(res);
1186  return NULL;
1187  }
1189  {
1190  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1191  {
1192  pg_log_error("could not send copy-end packet: %s",
1193  PQerrorMessage(conn));
1194  PQclear(res);
1195  return NULL;
1196  }
1197  res = PQgetResult(conn);
1198  }
1199  still_sending = false;
1200  }
1201  PQfreemem(copybuf);
1202  *stoppos = blockpos;
1203  return res;
1204 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
@ PGRES_COPY_IN
Definition: libpq-fe.h:127

References close_walfile(), conn, copybuf, pg_log_error, PGRES_COPY_IN, PQclear(), PQerrorMessage(), PQflush(), PQfreemem(), PQgetResult(), PQputCopyEnd(), PQresultStatus(), res, and still_sending.

Referenced by HandleCopyStream().

◆ mark_file_as_archived()

static bool mark_file_as_archived ( StreamCtl stream,
const char *  fname 
)
static

Definition at line 53 of file receivelog.c.

54 {
55  Walfile *f;
56  static char tmppath[MAXPGPATH];
57 
58  snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59  fname);
60 
61  f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
62  NULL, 0);
63  if (f == NULL)
64  {
65  pg_log_error("could not create archive status file \"%s\": %s",
66  tmppath, GetLastWalMethodError(stream->walmethod));
67  return false;
68  }
69 
70  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
71  {
72  pg_log_error("could not close archive status file \"%s\": %s",
73  tmppath, GetLastWalMethodError(stream->walmethod));
74  return false;
75  }
76 
77  return true;
78 }
#define snprintf
Definition: port.h:238
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49

References WalWriteMethodOps::close, CLOSE_NORMAL, GetLastWalMethodError(), MAXPGPATH, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, pg_log_error, snprintf, and StreamCtl::walmethod.

Referenced by close_walfile(), and writeTimeLineHistoryFile().

◆ open_walfile()

static bool open_walfile ( StreamCtl stream,
XLogRecPtr  startpoint 
)
static

Definition at line 89 of file receivelog.c.

90 {
91  Walfile *f;
92  char *fn;
93  ssize_t size;
94  XLogSegNo segno;
95  char walfile_name[MAXPGPATH];
96 
97  XLByteToSeg(startpoint, segno, WalSegSz);
98  XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
99 
100  /* Note that this considers the compression used if necessary */
101  fn = stream->walmethod->ops->get_file_name(stream->walmethod,
102  walfile_name,
103  stream->partial_suffix);
104 
105  /*
106  * When streaming to files, if an existing file exists we verify that it's
107  * either empty (just created), or a complete WalSegSz segment (in which
108  * case it has been created and padded). Anything else indicates a corrupt
109  * file. Compressed files have no need for padding, so just ignore this
110  * case.
111  *
112  * When streaming to tar, no file with this name will exist before, so we
113  * never have to verify a size.
114  */
116  stream->walmethod->ops->existsfile(stream->walmethod, fn))
117  {
118  size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
119  if (size < 0)
120  {
121  pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123  pg_free(fn);
124  return false;
125  }
126  if (size == WalSegSz)
127  {
128  /* Already padded file. Open it for use */
129  f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
130  if (f == NULL)
131  {
132  pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134  pg_free(fn);
135  return false;
136  }
137 
138  /* fsync file in case of a previous crash */
139  if (stream->walmethod->ops->sync(f) != 0)
140  {
141  pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143  stream->walmethod->ops->close(f, CLOSE_UNLINK);
144  exit(1);
145  }
146 
147  walfile = f;
148  pg_free(fn);
149  return true;
150  }
151  if (size != 0)
152  {
153  /* if write didn't set errno, assume problem is no disk space */
154  if (errno == 0)
155  errno = ENOSPC;
156  pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157  "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158  size),
159  fn, size, WalSegSz);
160  pg_free(fn);
161  return false;
162  }
163  /* File existed and was empty, so fall through and open */
164  }
165 
166  /* No file existed, so create one */
167 
168  f = stream->walmethod->ops->open_for_write(stream->walmethod,
169  walfile_name,
170  stream->partial_suffix,
171  WalSegSz);
172  if (f == NULL)
173  {
174  pg_log_error("could not open write-ahead log file \"%s\": %s",
176  pg_free(fn);
177  return false;
178  }
179 
180  pg_free(fn);
181  walfile = f;
182  return true;
183 }
#define ngettext(s, p, n)
Definition: c.h:1172
@ PG_COMPRESSION_NONE
Definition: compression.h:23
exit(1)
static pg_noinline void Size size
Definition: slab.c:607
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:61
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
@ CLOSE_UNLINK
Definition: walmethods.h:34
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References WalWriteMethodOps::close, CLOSE_UNLINK, WalWriteMethod::compression_algorithm, WalWriteMethodOps::existsfile, exit(), fn(), WalWriteMethodOps::get_file_name, WalWriteMethodOps::get_file_size, GetLastWalMethodError(), MAXPGPATH, ngettext, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, StreamCtl::partial_suffix, PG_COMPRESSION_NONE, pg_free(), pg_log_error, size, WalWriteMethodOps::sync, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, XLByteToSeg, and XLogFileName().

Referenced by ProcessXLogDataMsg().

◆ ProcessKeepaliveMsg()

static bool ProcessKeepaliveMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr  blockpos,
TimestampTz last_status 
)
static

Definition at line 985 of file receivelog.c.

987 {
988  int pos;
989  bool replyRequested;
991 
992  /*
993  * Parse the keepalive message, enclosed in the CopyData message. We just
994  * check if the server requested a reply, and ignore the rest.
995  */
996  pos = 1; /* skip msgtype 'k' */
997  pos += 8; /* skip walEnd */
998  pos += 8; /* skip sendTime */
999 
1000  if (len < pos + 1)
1001  {
1002  pg_log_error("streaming header too small: %d", len);
1003  return false;
1004  }
1005  replyRequested = copybuf[pos];
1006 
1007  /* If the server requested an immediate reply, send one. */
1008  if (replyRequested && still_sending)
1009  {
1010  if (reportFlushPosition && lastFlushPosition < blockpos &&
1011  walfile != NULL)
1012  {
1013  /*
1014  * If a valid flush location needs to be reported, flush the
1015  * current WAL file so that the latest flush location is sent back
1016  * to the server. This is necessary to see whether the last WAL
1017  * data has been successfully replicated or not, at the normal
1018  * shutdown of the server.
1019  */
1020  if (stream->walmethod->ops->sync(walfile) != 0)
1021  pg_fatal("could not fsync file \"%s\": %s",
1023  lastFlushPosition = blockpos;
1024  }
1025 
1027  if (!sendFeedback(conn, blockpos, now, false))
1028  return false;
1029  *last_status = now;
1030  }
1031 
1032  return true;
1033 }
const void size_t len
static bool reportFlushPosition
Definition: receivelog.c:29

References conn, copybuf, feGetCurrentTimestamp(), GetLastWalMethodError(), lastFlushPosition, len, now(), WalWriteMethod::ops, Walfile::pathname, pg_fatal, pg_log_error, reportFlushPosition, sendFeedback(), still_sending, WalWriteMethodOps::sync, walfile, and StreamCtl::walmethod.

Referenced by HandleCopyStream().

◆ ProcessXLogDataMsg()

static bool ProcessXLogDataMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr blockpos 
)
static

Definition at line 1039 of file receivelog.c.

1041 {
1042  int xlogoff;
1043  int bytes_left;
1044  int bytes_written;
1045  int hdr_len;
1046 
1047  /*
1048  * Once we've decided we don't want to receive any more, just ignore any
1049  * subsequent XLogData messages.
1050  */
1051  if (!(still_sending))
1052  return true;
1053 
1054  /*
1055  * Read the header of the XLogData message, enclosed in the CopyData
1056  * message. We only need the WAL location field (dataStart), the rest of
1057  * the header is ignored.
1058  */
1059  hdr_len = 1; /* msgtype 'w' */
1060  hdr_len += 8; /* dataStart */
1061  hdr_len += 8; /* walEnd */
1062  hdr_len += 8; /* sendTime */
1063  if (len < hdr_len)
1064  {
1065  pg_log_error("streaming header too small: %d", len);
1066  return false;
1067  }
1068  *blockpos = fe_recvint64(&copybuf[1]);
1069 
1070  /* Extract WAL location for this block */
1071  xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1072 
1073  /*
1074  * Verify that the initial location in the stream matches where we think
1075  * we are.
1076  */
1077  if (walfile == NULL)
1078  {
1079  /* No file open yet */
1080  if (xlogoff != 0)
1081  {
1082  pg_log_error("received write-ahead log record for offset %u with no file open",
1083  xlogoff);
1084  return false;
1085  }
1086  }
1087  else
1088  {
1089  /* More data in existing segment */
1090  if (walfile->currpos != xlogoff)
1091  {
1092  pg_log_error("got WAL data offset %08x, expected %08x",
1093  xlogoff, (int) walfile->currpos);
1094  return false;
1095  }
1096  }
1097 
1098  bytes_left = len - hdr_len;
1099  bytes_written = 0;
1100 
1101  while (bytes_left)
1102  {
1103  int bytes_to_write;
1104 
1105  /*
1106  * If crossing a WAL boundary, only write up until we reach wal
1107  * segment size.
1108  */
1109  if (xlogoff + bytes_left > WalSegSz)
1110  bytes_to_write = WalSegSz - xlogoff;
1111  else
1112  bytes_to_write = bytes_left;
1113 
1114  if (walfile == NULL)
1115  {
1116  if (!open_walfile(stream, *blockpos))
1117  {
1118  /* Error logged by open_walfile */
1119  return false;
1120  }
1121  }
1122 
1123  if (stream->walmethod->ops->write(walfile,
1124  copybuf + hdr_len + bytes_written,
1125  bytes_to_write) != bytes_to_write)
1126  {
1127  pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1128  bytes_to_write, walfile->pathname,
1129  GetLastWalMethodError(stream->walmethod));
1130  return false;
1131  }
1132 
1133  /* Write was successful, advance our position */
1134  bytes_written += bytes_to_write;
1135  bytes_left -= bytes_to_write;
1136  *blockpos += bytes_to_write;
1137  xlogoff += bytes_to_write;
1138 
1139  /* Did we reach the end of a WAL segment? */
1140  if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1141  {
1142  if (!close_walfile(stream, *blockpos))
1143  /* Error message written in close_walfile() */
1144  return false;
1145 
1146  xlogoff = 0;
1147 
1148  if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1149  {
1150  if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1151  {
1152  pg_log_error("could not send copy-end packet: %s",
1153  PQerrorMessage(conn));
1154  return false;
1155  }
1156  still_sending = false;
1157  return true; /* ignore the rest of this XLogData packet */
1158  }
1159  }
1160  }
1161  /* No more data left to write, receive next copy packet */
1162 
1163  return true;
1164 }
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:89
int64 fe_recvint64(char *buf)
Definition: streamutil.c:932
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
Definition: walmethods.h:73
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

References close_walfile(), conn, copybuf, Walfile::currpos, fe_recvint64(), GetLastWalMethodError(), len, open_walfile(), WalWriteMethod::ops, Walfile::pathname, pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, WalWriteMethodOps::write, and XLogSegmentOffset.

Referenced by HandleCopyStream().

◆ ReadEndOfStreamingResult()

static bool ReadEndOfStreamingResult ( PGresult res,
XLogRecPtr startpos,
uint32 timeline 
)
static

Definition at line 698 of file receivelog.c.

699 {
700  uint32 startpos_xlogid,
701  startpos_xrecoff;
702 
703  /*----------
704  * The result set consists of one row and two columns, e.g:
705  *
706  * next_tli | next_tli_startpos
707  * ----------+-------------------
708  * 4 | 0/9949AE0
709  *
710  * next_tli is the timeline ID of the next timeline after the one that
711  * just finished streaming. next_tli_startpos is the WAL location where
712  * the server switched to it.
713  *----------
714  */
715  if (PQnfields(res) < 2 || PQntuples(res) != 1)
716  {
717  pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
718  PQntuples(res), PQnfields(res), 1, 2);
719  return false;
720  }
721 
722  *timeline = atoi(PQgetvalue(res, 0, 0));
723  if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724  &startpos_xrecoff) != 2)
725  {
726  pg_log_error("could not parse next timeline's starting point \"%s\"",
727  PQgetvalue(res, 0, 1));
728  return false;
729  }
730  *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731 
732  return true;
733 }
unsigned int uint32
Definition: c.h:506
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
static XLogRecPtr startpos

References pg_log_error, PQgetvalue(), PQnfields(), PQntuples(), res, and startpos.

Referenced by ReceiveXlogStream().

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 452 of file receivelog.c.

453 {
454  char query[128];
455  char slotcmd[128];
456  PGresult *res;
457  XLogRecPtr stoppos;
458 
459  /*
460  * The caller should've checked the server version already, but doesn't do
461  * any harm to check it here too.
462  */
464  return false;
465 
466  /*
467  * Decide whether we want to report the flush position. If we report the
468  * flush position, the primary will know what WAL we'll possibly
469  * re-request, and it can then remove older WAL safely. We must always do
470  * that when we are using slots.
471  *
472  * Reporting the flush position makes one eligible as a synchronous
473  * replica. People shouldn't include generic names in
474  * synchronous_standby_names, but we've protected them against it so far,
475  * so let's continue to do so unless specifically requested.
476  */
477  if (stream->replication_slot != NULL)
478  {
479  reportFlushPosition = true;
480  sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
481  }
482  else
483  {
484  if (stream->synchronous)
485  reportFlushPosition = true;
486  else
487  reportFlushPosition = false;
488  slotcmd[0] = 0;
489  }
490 
491  if (stream->sysidentifier != NULL)
492  {
493  char *sysidentifier = NULL;
494  TimeLineID servertli;
495 
496  /*
497  * Get the server system identifier and timeline, and validate them.
498  */
499  if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
500  {
501  pg_free(sysidentifier);
502  return false;
503  }
504 
505  if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506  {
507  pg_log_error("system identifier does not match between base backup and streaming connection");
508  pg_free(sysidentifier);
509  return false;
510  }
511  pg_free(sysidentifier);
512 
513  if (stream->timeline > servertli)
514  {
515  pg_log_error("starting timeline %u is not present in the server",
516  stream->timeline);
517  return false;
518  }
519  }
520 
521  /*
522  * initialize flush position to starting point, it's the caller's
523  * responsibility that that's sane.
524  */
525  lastFlushPosition = stream->startpos;
526 
527  while (1)
528  {
529  /*
530  * Fetch the timeline history file for this timeline, if we don't have
531  * it already. When streaming log to tar, this will always return
532  * false, as we are never streaming into an existing file and
533  * therefore there can be no pre-existing timeline history file.
534  */
535  if (!existsTimeLineHistoryFile(stream))
536  {
537  snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
538  res = PQexec(conn, query);
540  {
541  /* FIXME: we might send it ok, but get an error */
542  pg_log_error("could not send replication command \"%s\": %s",
543  "TIMELINE_HISTORY", PQresultErrorMessage(res));
544  PQclear(res);
545  return false;
546  }
547 
548  /*
549  * The response to TIMELINE_HISTORY is a single row result set
550  * with two fields: filename and content
551  */
552  if (PQnfields(res) != 2 || PQntuples(res) != 1)
553  {
554  pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555  PQntuples(res), PQnfields(res), 1, 2);
556  }
557 
558  /* Write the history file to disk */
560  PQgetvalue(res, 0, 0),
561  PQgetvalue(res, 0, 1));
562 
563  PQclear(res);
564  }
565 
566  /*
567  * Before we start streaming from the requested location, check if the
568  * callback tells us to stop here.
569  */
570  if (stream->stream_stop(stream->startpos, stream->timeline, false))
571  return true;
572 
573  /* Initiate the replication stream at specified location */
574  snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575  slotcmd,
576  LSN_FORMAT_ARGS(stream->startpos),
577  stream->timeline);
578  res = PQexec(conn, query);
580  {
581  pg_log_error("could not send replication command \"%s\": %s",
582  "START_REPLICATION", PQresultErrorMessage(res));
583  PQclear(res);
584  return false;
585  }
586  PQclear(res);
587 
588  /* Stream the WAL */
589  res = HandleCopyStream(conn, stream, &stoppos);
590  if (res == NULL)
591  goto error;
592 
593  /*
594  * Streaming finished.
595  *
596  * There are two possible reasons for that: a controlled shutdown, or
597  * we reached the end of the current timeline. In case of
598  * end-of-timeline, the server sends a result set after Copy has
599  * finished, containing information about the next timeline. Read
600  * that, and restart streaming from the next timeline. In case of
601  * controlled shutdown, stop here.
602  */
604  {
605  /*
606  * End-of-timeline. Read the next timeline's ID and starting
607  * position. Usually, the starting position will match the end of
608  * the previous timeline, but there are corner cases like if the
609  * server had sent us half of a WAL record, when it was promoted.
610  * The new timeline will begin at the end of the last complete
611  * record in that case, overlapping the partial WAL record on the
612  * old timeline.
613  */
614  uint32 newtimeline;
615  bool parsed;
616 
617  parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
618  PQclear(res);
619  if (!parsed)
620  goto error;
621 
622  /* Sanity check the values the server gave us */
623  if (newtimeline <= stream->timeline)
624  {
625  pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626  newtimeline, stream->timeline);
627  goto error;
628  }
629  if (stream->startpos > stoppos)
630  {
631  pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
632  stream->timeline, LSN_FORMAT_ARGS(stoppos),
633  newtimeline, LSN_FORMAT_ARGS(stream->startpos));
634  goto error;
635  }
636 
637  /* Read the final result, which should be CommandComplete. */
638  res = PQgetResult(conn);
640  {
641  pg_log_error("unexpected termination of replication stream: %s",
643  PQclear(res);
644  goto error;
645  }
646  PQclear(res);
647 
648  /*
649  * Loop back to start streaming from the new timeline. Always
650  * start streaming at the beginning of a segment.
651  */
652  stream->timeline = newtimeline;
653  stream->startpos = stream->startpos -
655  continue;
656  }
657  else if (PQresultStatus(res) == PGRES_COMMAND_OK)
658  {
659  PQclear(res);
660 
661  /*
662  * End of replication (ie. controlled shut down of the server).
663  *
664  * Check if the callback thinks it's OK to stop here. If not,
665  * complain.
666  */
667  if (stream->stream_stop(stoppos, stream->timeline, false))
668  return true;
669  else
670  {
671  pg_log_error("replication stream was terminated before stop point");
672  goto error;
673  }
674  }
675  else
676  {
677  /* Server returned an error. */
678  pg_log_error("unexpected termination of replication stream: %s",
680  PQclear(res);
681  goto error;
682  }
683  }
684 
685 error:
686  if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
687  pg_log_error("could not close file \"%s\": %s",
689  walfile = NULL;
690  return false;
691 }
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:132
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:744
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:257
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:274
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:374
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:698
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:478
char * sysidentifier
Definition: receivelog.h:33
char * replication_slot
Definition: receivelog.h:48
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint32 TimeLineID
Definition: xlogdefs.h:59

References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, res, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

◆ sendFeedback()

static bool sendFeedback ( PGconn conn,
XLogRecPtr  blockpos,
TimestampTz  now,
bool  replyRequested 
)
static

Definition at line 336 of file receivelog.c.

337 {
338  char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339  int len = 0;
340 
341  replybuf[len] = 'r';
342  len += 1;
343  fe_sendint64(blockpos, &replybuf[len]); /* write */
344  len += 8;
346  fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
347  else
348  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
349  len += 8;
350  fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
351  len += 8;
352  fe_sendint64(now, &replybuf[len]); /* sendTime */
353  len += 8;
354  replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
355  len += 1;
356 
357  if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
358  {
359  pg_log_error("could not send feedback packet: %s",
361  return false;
362  }
363 
364  return true;
365 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:921
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References conn, fe_sendint64(), InvalidXLogRecPtr, lastFlushPosition, len, now(), pg_log_error, PQerrorMessage(), PQflush(), PQputCopyData(), and reportFlushPosition.

Referenced by HandleCopyStream(), and ProcessKeepaliveMsg().

◆ writeTimeLineHistoryFile()

static bool writeTimeLineHistoryFile ( StreamCtl stream,
char *  filename,
char *  content 
)
static

Definition at line 274 of file receivelog.c.

275 {
276  int size = strlen(content);
277  char histfname[MAXFNAMELEN];
278  Walfile *f;
279 
280  /*
281  * Check that the server's idea of how timeline history files should be
282  * named matches ours.
283  */
284  TLHistoryFileName(histfname, stream->timeline);
285  if (strcmp(histfname, filename) != 0)
286  {
287  pg_log_error("server reported unexpected history file name for timeline %u: %s",
288  stream->timeline, filename);
289  return false;
290  }
291 
292  f = stream->walmethod->ops->open_for_write(stream->walmethod,
293  histfname, ".tmp", 0);
294  if (f == NULL)
295  {
296  pg_log_error("could not create timeline history file \"%s\": %s",
297  histfname, GetLastWalMethodError(stream->walmethod));
298  return false;
299  }
300 
301  if ((int) stream->walmethod->ops->write(f, content, size) != size)
302  {
303  pg_log_error("could not write timeline history file \"%s\": %s",
304  histfname, GetLastWalMethodError(stream->walmethod));
305 
306  /*
307  * If we fail to make the file, delete it to release disk space
308  */
309  stream->walmethod->ops->close(f, CLOSE_UNLINK);
310 
311  return false;
312  }
313 
314  if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
315  {
316  pg_log_error("could not close file \"%s\": %s",
317  histfname, GetLastWalMethodError(stream->walmethod));
318  return false;
319  }
320 
321  /* Maintain archive_status, check close_walfile() for details. */
322  if (stream->mark_done)
323  {
324  /* writes error message if failed */
325  if (!mark_file_as_archived(stream, histfname))
326  return false;
327  }
328 
329  return true;
330 }
static char * filename
Definition: pg_dumpall.c:119

References WalWriteMethodOps::close, CLOSE_NORMAL, CLOSE_UNLINK, filename, GetLastWalMethodError(), StreamCtl::mark_done, mark_file_as_archived(), MAXFNAMELEN, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, pg_log_error, size, StreamCtl::timeline, TLHistoryFileName(), StreamCtl::walmethod, and WalWriteMethodOps::write.

Referenced by ReceiveXlogStream().

Variable Documentation

◆ lastFlushPosition

XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
static

◆ reportFlushPosition

bool reportFlushPosition = false
static

Definition at line 29 of file receivelog.c.

Referenced by ProcessKeepaliveMsg(), ReceiveXlogStream(), and sendFeedback().

◆ still_sending

◆ walfile