PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
receivelog.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/logging.h"
#include "libpq-fe.h"
#include "receivelog.h"
#include "streamutil.h"
Include dependency graph for receivelog.c:

Go to the source code of this file.

Functions

static PGresultHandleCopyStream (PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
 
static int CopyStreamPoll (PGconn *conn, long timeout_ms, pgsocket stop_socket)
 
static int CopyStreamReceive (PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
 
static bool ProcessKeepaliveMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
 
static bool ProcessXLogDataMsg (PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
 
static PGresultHandleEndOfCopyStream (PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
 
static bool CheckCopyStreamStop (PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
 
static long CalculateCopyStreamSleeptime (TimestampTz now, int standby_message_timeout, TimestampTz last_status)
 
static bool ReadEndOfStreamingResult (PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
 
static bool mark_file_as_archived (StreamCtl *stream, const char *fname)
 
static bool open_walfile (StreamCtl *stream, XLogRecPtr startpoint)
 
static bool close_walfile (StreamCtl *stream, XLogRecPtr pos)
 
static bool existsTimeLineHistoryFile (StreamCtl *stream)
 
static bool writeTimeLineHistoryFile (StreamCtl *stream, char *filename, char *content)
 
static bool sendFeedback (PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
 
bool CheckServerVersionForStreaming (PGconn *conn)
 
bool ReceiveXlogStream (PGconn *conn, StreamCtl *stream)
 

Variables

static Walfilewalfile = NULL
 
static bool reportFlushPosition = false
 
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
 
static bool still_sending = true
 

Function Documentation

◆ CalculateCopyStreamSleeptime()

static long CalculateCopyStreamSleeptime ( TimestampTz  now,
int  standby_message_timeout,
TimestampTz  last_status 
)
static

Definition at line 1235 of file receivelog.c.

1237{
1238 TimestampTz status_targettime = 0;
1239 long sleeptime;
1240
1242 status_targettime = last_status +
1243 (standby_message_timeout - 1) * ((int64) 1000);
1244
1245 if (status_targettime > 0)
1246 {
1247 long secs;
1248 int usecs;
1249
1251 status_targettime,
1252 &secs,
1253 &usecs);
1254 /* Always sleep at least 1 sec */
1255 if (secs <= 0)
1256 {
1257 secs = 1;
1258 usecs = 0;
1259 }
1260
1261 sleeptime = secs * 1000 + usecs / 1000;
1262 }
1263 else
1264 sleeptime = -1;
1265
1266 return sleeptime;
1267}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
int64_t int64
Definition: c.h:482
int64 TimestampTz
Definition: timestamp.h:39
static int standby_message_timeout
static bool still_sending
Definition: receivelog.c:32
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:886

References feTimestampDifference(), now(), standby_message_timeout, and still_sending.

Referenced by HandleCopyStream().

◆ CheckCopyStreamStop()

static bool CheckCopyStreamStop ( PGconn conn,
StreamCtl stream,
XLogRecPtr  blockpos 
)
static

Definition at line 1210 of file receivelog.c.

1211{
1212 if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1213 {
1214 if (!close_walfile(stream, blockpos))
1215 {
1216 /* Potential error message is written by close_walfile */
1217 return false;
1218 }
1219 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220 {
1221 pg_log_error("could not send copy-end packet: %s",
1223 return false;
1224 }
1225 still_sending = false;
1226 }
1227
1228 return true;
1229}
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7209
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2749
#define pg_log_error(...)
Definition: logging.h:106
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
Definition: receivelog.c:191
PGconn * conn
Definition: streamutil.c:53
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41

References close_walfile(), conn, pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, and StreamCtl::timeline.

Referenced by HandleCopyStream().

◆ CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn conn)

Definition at line 374 of file receivelog.c.

375{
376 int minServerMajor,
377 maxServerMajor;
378 int serverMajor;
379
380 /*
381 * The message format used in streaming replication changed in 9.3, so we
382 * cannot stream from older servers. And we don't support servers newer
383 * than the client; it might work, but we don't know, so err on the safe
384 * side.
385 */
386 minServerMajor = 903;
387 maxServerMajor = PG_VERSION_NUM / 100;
388 serverMajor = PQserverVersion(conn) / 100;
389 if (serverMajor < minServerMajor)
390 {
391 const char *serverver = PQparameterStatus(conn, "server_version");
392
393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394 serverver ? serverver : "'unknown'",
395 "9.3");
396 return false;
397 }
398 else if (serverMajor > maxServerMajor)
399 {
400 const char *serverver = PQparameterStatus(conn, "server_version");
401
402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403 serverver ? serverver : "'unknown'",
404 PG_VERSION);
405 return false;
406 }
407 return true;
408}
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7199
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7164

References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

◆ close_walfile()

static bool close_walfile ( StreamCtl stream,
XLogRecPtr  pos 
)
static

Definition at line 191 of file receivelog.c.

192{
193 char *fn;
194 off_t currpos;
195 int r;
196 char walfile_name[MAXPGPATH];
197
198 if (walfile == NULL)
199 return true;
200
201 strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
202 currpos = walfile->currpos;
203
204 /* Note that this considers the compression used if necessary */
205 fn = stream->walmethod->ops->get_file_name(stream->walmethod,
206 walfile_name,
207 stream->partial_suffix);
208
209 if (stream->partial_suffix)
210 {
211 if (currpos == WalSegSz)
212 r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
213 else
214 {
215 pg_log_info("not renaming \"%s\", segment is not complete", fn);
216 r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
217 }
218 }
219 else
220 r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
221
222 walfile = NULL;
223
224 if (r != 0)
225 {
226 pg_log_error("could not close file \"%s\": %s",
228
229 pg_free(fn);
230 return false;
231 }
232
233 pg_free(fn);
234
235 /*
236 * Mark file as archived if requested by the caller - pg_basebackup needs
237 * to do so as files can otherwise get archived again after promotion of a
238 * new node. This is in line with walreceiver.c always doing a
239 * XLogArchiveForceDone() after a complete segment.
240 */
241 if (currpos == WalSegSz && stream->mark_done)
242 {
243 /* writes error message if failed */
244 if (!mark_file_as_archived(stream, walfile_name))
245 return false;
246 }
247
248 lastFlushPosition = pos;
249 return true;
250}
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_log_info(...)
Definition: logging.h:124
#define MAXPGPATH
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
Definition: receivelog.c:53
static Walfile * walfile
Definition: receivelog.c:28
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:30
int WalSegSz
Definition: streamutil.c:32
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool mark_done
Definition: receivelog.h:37
char * partial_suffix
Definition: receivelog.h:47
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
const WalWriteMethodOps * ops
Definition: walmethods.h:105
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
static void * fn(void *arg)
Definition: thread-alloc.c:119
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
@ CLOSE_NORMAL
Definition: walmethods.h:33

References WalWriteMethodOps::close, CLOSE_NO_RENAME, CLOSE_NORMAL, Walfile::currpos, fn(), WalWriteMethodOps::get_file_name, GetLastWalMethodError(), lastFlushPosition, StreamCtl::mark_done, mark_file_as_archived(), MAXPGPATH, WalWriteMethod::ops, StreamCtl::partial_suffix, Walfile::pathname, pg_free(), pg_log_error, pg_log_info, strlcpy(), walfile, StreamCtl::walmethod, and WalSegSz.

Referenced by CheckCopyStreamStop(), HandleEndOfCopyStream(), and ProcessXLogDataMsg().

◆ CopyStreamPoll()

static int CopyStreamPoll ( PGconn conn,
long  timeout_ms,
pgsocket  stop_socket 
)
static

Definition at line 869 of file receivelog.c.

870{
871 int ret;
872 fd_set input_mask;
873 int connsocket;
874 int maxfd;
875 struct timeval timeout;
876 struct timeval *timeoutptr;
877
878 connsocket = PQsocket(conn);
879 if (connsocket < 0)
880 {
881 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
882 return -1;
883 }
884
885 FD_ZERO(&input_mask);
886 FD_SET(connsocket, &input_mask);
887 maxfd = connsocket;
888 if (stop_socket != PGINVALID_SOCKET)
889 {
890 FD_SET(stop_socket, &input_mask);
891 maxfd = Max(maxfd, stop_socket);
892 }
893
894 if (timeout_ms < 0)
895 timeoutptr = NULL;
896 else
897 {
898 timeout.tv_sec = timeout_ms / 1000L;
899 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900 timeoutptr = &timeout;
901 }
902
903 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
904
905 if (ret < 0)
906 {
907 if (errno == EINTR)
908 return 0; /* Got a signal, so not an error */
909 pg_log_error("%s() failed: %m", "select");
910 return -1;
911 }
912 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
913 return 1; /* Got input on connection socket */
914
915 return 0; /* Got timeout or input on stop_socket */
916}
#define Max(x, y)
Definition: c.h:952
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7235
#define PGINVALID_SOCKET
Definition: port.h:31
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513

References conn, EINTR, Max, pg_log_error, PGINVALID_SOCKET, PQerrorMessage(), PQsocket(), and select.

Referenced by CopyStreamReceive().

◆ CopyStreamReceive()

static int CopyStreamReceive ( PGconn conn,
long  timeout,
pgsocket  stop_socket,
char **  buffer 
)
static

Definition at line 931 of file receivelog.c.

933{
934 char *copybuf = NULL;
935 int rawlen;
936
937 PQfreemem(*buffer);
938 *buffer = NULL;
939
940 /* Try to receive a CopyData message */
941 rawlen = PQgetCopyData(conn, &copybuf, 1);
942 if (rawlen == 0)
943 {
944 int ret;
945
946 /*
947 * No data available. Wait for some to appear, but not longer than
948 * the specified timeout, so that we can ping the server. Also stop
949 * waiting if input appears on stop_socket.
950 */
951 ret = CopyStreamPoll(conn, timeout, stop_socket);
952 if (ret <= 0)
953 return ret;
954
955 /* Now there is actually data on the socket */
956 if (PQconsumeInput(conn) == 0)
957 {
958 pg_log_error("could not receive data from WAL stream: %s",
960 return -1;
961 }
962
963 /* Now that we've consumed some input, try again */
964 rawlen = PQgetCopyData(conn, &copybuf, 1);
965 if (rawlen == 0)
966 return 0;
967 }
968 if (rawlen == -1) /* end-of-streaming or error */
969 return -2;
970 if (rawlen == -2)
971 {
972 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
973 return -1;
974 }
975
976 /* Return received messages to caller */
977 *buffer = copybuf;
978 return rawlen;
979}
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2816
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
Definition: receivelog.c:869
static StringInfo copybuf
Definition: tablesync.c:137

References conn, copybuf, CopyStreamPoll(), pg_log_error, PQconsumeInput(), PQerrorMessage(), PQfreemem(), and PQgetCopyData().

Referenced by HandleCopyStream().

◆ existsTimeLineHistoryFile()

static bool existsTimeLineHistoryFile ( StreamCtl stream)
static

Definition at line 257 of file receivelog.c.

258{
259 char histfname[MAXFNAMELEN];
260
261 /*
262 * Timeline 1 never has a history file. We treat that as if it existed,
263 * since we never need to stream it.
264 */
265 if (stream->timeline == 1)
266 return true;
267
268 TLHistoryFileName(histfname, stream->timeline);
269
270 return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
271}
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:58
#define MAXFNAMELEN
static void TLHistoryFileName(char *fname, TimeLineID tli)

References WalWriteMethodOps::existsfile, MAXFNAMELEN, WalWriteMethod::ops, StreamCtl::timeline, TLHistoryFileName(), and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

◆ HandleCopyStream()

static PGresult * HandleCopyStream ( PGconn conn,
StreamCtl stream,
XLogRecPtr stoppos 
)
static

Definition at line 744 of file receivelog.c.

746{
747 char *copybuf = NULL;
748 TimestampTz last_status = -1;
749 XLogRecPtr blockpos = stream->startpos;
750
751 still_sending = true;
752
753 while (1)
754 {
755 int r;
757 long sleeptime;
758
759 /*
760 * Check if we should continue streaming, or abort at this point.
761 */
762 if (!CheckCopyStreamStop(conn, stream, blockpos))
763 goto error;
764
766
767 /*
768 * If synchronous option is true, issue sync command as soon as there
769 * are WAL data which has not been flushed yet.
770 */
771 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
772 {
773 if (stream->walmethod->ops->sync(walfile) != 0)
774 pg_fatal("could not fsync file \"%s\": %s",
776 lastFlushPosition = blockpos;
777
778 /*
779 * Send feedback so that the server sees the latest WAL locations
780 * immediately.
781 */
782 if (!sendFeedback(conn, blockpos, now, false))
783 goto error;
784 last_status = now;
785 }
786
787 /*
788 * Potentially send a status message to the primary
789 */
790 if (still_sending && stream->standby_message_timeout > 0 &&
793 {
794 /* Time to send feedback! */
795 if (!sendFeedback(conn, blockpos, now, false))
796 goto error;
797 last_status = now;
798 }
799
800 /*
801 * Calculate how long send/receive loops should sleep
802 */
804 last_status);
805
806 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
807 while (r != 0)
808 {
809 if (r == -1)
810 goto error;
811 if (r == -2)
812 {
813 PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814
815 if (res == NULL)
816 goto error;
817 else
818 return res;
819 }
820
821 /* Check the message type. */
822 if (copybuf[0] == 'k')
823 {
824 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825 &last_status))
826 goto error;
827 }
828 else if (copybuf[0] == 'w')
829 {
830 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831 goto error;
832
833 /*
834 * Check if we should continue streaming, or abort at this
835 * point.
836 */
837 if (!CheckCopyStreamStop(conn, stream, blockpos))
838 goto error;
839 }
840 else
841 {
842 pg_log_error("unrecognized streaming header: \"%c\"",
843 copybuf[0]);
844 goto error;
845 }
846
847 /*
848 * Process the received data, and any subsequent data we can read
849 * without blocking.
850 */
851 r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
852 }
853 }
854
855error:
857 return NULL;
858}
#define pg_fatal(...)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
Definition: receivelog.c:931
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
Definition: receivelog.c:1210
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
Definition: receivelog.c:985
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
Definition: receivelog.c:1039
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
Definition: receivelog.c:1170
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
Definition: receivelog.c:1235
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
Definition: receivelog.c:336
static void error(void)
Definition: sql-dyntest.c:147
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:867
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:908
XLogRecPtr startpos
Definition: receivelog.h:31
pgsocket stop_socket
Definition: receivelog.h:43
int standby_message_timeout
Definition: receivelog.h:35
bool synchronous
Definition: receivelog.h:36
int(* sync)(Walfile *f)
Definition: walmethods.h:78
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CalculateCopyStreamSleeptime(), CheckCopyStreamStop(), conn, copybuf, CopyStreamReceive(), error(), feGetCurrentTimestamp(), feTimestampDifferenceExceeds(), GetLastWalMethodError(), HandleEndOfCopyStream(), lastFlushPosition, now(), WalWriteMethod::ops, Walfile::pathname, pg_fatal, pg_log_error, PQfreemem(), ProcessKeepaliveMsg(), ProcessXLogDataMsg(), res, sendFeedback(), StreamCtl::standby_message_timeout, StreamCtl::startpos, still_sending, StreamCtl::stop_socket, WalWriteMethodOps::sync, StreamCtl::synchronous, walfile, and StreamCtl::walmethod.

Referenced by ReceiveXlogStream().

◆ HandleEndOfCopyStream()

static PGresult * HandleEndOfCopyStream ( PGconn conn,
StreamCtl stream,
char *  copybuf,
XLogRecPtr  blockpos,
XLogRecPtr stoppos 
)
static

Definition at line 1170 of file receivelog.c.

1172{
1174
1175 /*
1176 * The server closed its end of the copy stream. If we haven't closed
1177 * ours already, we need to do so now, unless the server threw an error,
1178 * in which case we don't.
1179 */
1180 if (still_sending)
1181 {
1182 if (!close_walfile(stream, blockpos))
1183 {
1184 /* Error message written in close_walfile() */
1185 PQclear(res);
1186 return NULL;
1187 }
1189 {
1190 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1191 {
1192 pg_log_error("could not send copy-end packet: %s",
1194 PQclear(res);
1195 return NULL;
1196 }
1197 res = PQgetResult(conn);
1198 }
1199 still_sending = false;
1200 }
1202 *stoppos = blockpos;
1203 return res;
1204}
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
@ PGRES_COPY_IN
Definition: libpq-fe.h:127

References close_walfile(), conn, copybuf, pg_log_error, PGRES_COPY_IN, PQclear(), PQerrorMessage(), PQflush(), PQfreemem(), PQgetResult(), PQputCopyEnd(), PQresultStatus(), res, and still_sending.

Referenced by HandleCopyStream().

◆ mark_file_as_archived()

static bool mark_file_as_archived ( StreamCtl stream,
const char *  fname 
)
static

Definition at line 53 of file receivelog.c.

54{
55 Walfile *f;
56 static char tmppath[MAXPGPATH];
57
58 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59 fname);
60
61 f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
62 NULL, 0);
63 if (f == NULL)
64 {
65 pg_log_error("could not create archive status file \"%s\": %s",
66 tmppath, GetLastWalMethodError(stream->walmethod));
67 return false;
68 }
69
70 if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
71 {
72 pg_log_error("could not close archive status file \"%s\": %s",
73 tmppath, GetLastWalMethodError(stream->walmethod));
74 return false;
75 }
76
77 return true;
78}
#define snprintf
Definition: port.h:238
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49

References WalWriteMethodOps::close, CLOSE_NORMAL, GetLastWalMethodError(), MAXPGPATH, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, pg_log_error, snprintf, and StreamCtl::walmethod.

Referenced by close_walfile(), and writeTimeLineHistoryFile().

◆ open_walfile()

static bool open_walfile ( StreamCtl stream,
XLogRecPtr  startpoint 
)
static

Definition at line 89 of file receivelog.c.

90{
91 Walfile *f;
92 char *fn;
93 ssize_t size;
94 XLogSegNo segno;
95 char walfile_name[MAXPGPATH];
96
97 XLByteToSeg(startpoint, segno, WalSegSz);
98 XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
99
100 /* Note that this considers the compression used if necessary */
101 fn = stream->walmethod->ops->get_file_name(stream->walmethod,
102 walfile_name,
103 stream->partial_suffix);
104
105 /*
106 * When streaming to files, if an existing file exists we verify that it's
107 * either empty (just created), or a complete WalSegSz segment (in which
108 * case it has been created and padded). Anything else indicates a corrupt
109 * file. Compressed files have no need for padding, so just ignore this
110 * case.
111 *
112 * When streaming to tar, no file with this name will exist before, so we
113 * never have to verify a size.
114 */
116 stream->walmethod->ops->existsfile(stream->walmethod, fn))
117 {
118 size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
119 if (size < 0)
120 {
121 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 pg_free(fn);
124 return false;
125 }
126 if (size == WalSegSz)
127 {
128 /* Already padded file. Open it for use */
129 f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
130 if (f == NULL)
131 {
132 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 pg_free(fn);
135 return false;
136 }
137
138 /* fsync file in case of a previous crash */
139 if (stream->walmethod->ops->sync(f) != 0)
140 {
141 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 stream->walmethod->ops->close(f, CLOSE_UNLINK);
144 exit(1);
145 }
146
147 walfile = f;
148 pg_free(fn);
149 return true;
150 }
151 if (size != 0)
152 {
153 /* if write didn't set errno, assume problem is no disk space */
154 if (errno == 0)
155 errno = ENOSPC;
156 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158 size),
159 fn, size, WalSegSz);
160 pg_free(fn);
161 return false;
162 }
163 /* File existed and was empty, so fall through and open */
164 }
165
166 /* No file existed, so create one */
167
168 f = stream->walmethod->ops->open_for_write(stream->walmethod,
169 walfile_name,
170 stream->partial_suffix,
171 WalSegSz);
172 if (f == NULL)
173 {
174 pg_log_error("could not open write-ahead log file \"%s\": %s",
176 pg_free(fn);
177 return false;
178 }
179
180 pg_free(fn);
181 walfile = f;
182 return true;
183}
#define ngettext(s, p, n)
Definition: c.h:1135
@ PG_COMPRESSION_NONE
Definition: compression.h:23
exit(1)
static pg_noinline void Size size
Definition: slab.c:607
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.h:61
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
@ CLOSE_UNLINK
Definition: walmethods.h:34
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References WalWriteMethodOps::close, CLOSE_UNLINK, WalWriteMethod::compression_algorithm, WalWriteMethodOps::existsfile, exit(), fn(), WalWriteMethodOps::get_file_name, WalWriteMethodOps::get_file_size, GetLastWalMethodError(), MAXPGPATH, ngettext, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, StreamCtl::partial_suffix, PG_COMPRESSION_NONE, pg_free(), pg_log_error, size, WalWriteMethodOps::sync, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, XLByteToSeg, and XLogFileName().

Referenced by ProcessXLogDataMsg().

◆ ProcessKeepaliveMsg()

static bool ProcessKeepaliveMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr  blockpos,
TimestampTz last_status 
)
static

Definition at line 985 of file receivelog.c.

987{
988 int pos;
989 bool replyRequested;
991
992 /*
993 * Parse the keepalive message, enclosed in the CopyData message. We just
994 * check if the server requested a reply, and ignore the rest.
995 */
996 pos = 1; /* skip msgtype 'k' */
997 pos += 8; /* skip walEnd */
998 pos += 8; /* skip sendTime */
999
1000 if (len < pos + 1)
1001 {
1002 pg_log_error("streaming header too small: %d", len);
1003 return false;
1004 }
1005 replyRequested = copybuf[pos];
1006
1007 /* If the server requested an immediate reply, send one. */
1008 if (replyRequested && still_sending)
1009 {
1010 if (reportFlushPosition && lastFlushPosition < blockpos &&
1011 walfile != NULL)
1012 {
1013 /*
1014 * If a valid flush location needs to be reported, flush the
1015 * current WAL file so that the latest flush location is sent back
1016 * to the server. This is necessary to see whether the last WAL
1017 * data has been successfully replicated or not, at the normal
1018 * shutdown of the server.
1019 */
1020 if (stream->walmethod->ops->sync(walfile) != 0)
1021 pg_fatal("could not fsync file \"%s\": %s",
1023 lastFlushPosition = blockpos;
1024 }
1025
1027 if (!sendFeedback(conn, blockpos, now, false))
1028 return false;
1029 *last_status = now;
1030 }
1031
1032 return true;
1033}
const void size_t len
static bool reportFlushPosition
Definition: receivelog.c:29

References conn, copybuf, feGetCurrentTimestamp(), GetLastWalMethodError(), lastFlushPosition, len, now(), WalWriteMethod::ops, Walfile::pathname, pg_fatal, pg_log_error, reportFlushPosition, sendFeedback(), still_sending, WalWriteMethodOps::sync, walfile, and StreamCtl::walmethod.

Referenced by HandleCopyStream().

◆ ProcessXLogDataMsg()

static bool ProcessXLogDataMsg ( PGconn conn,
StreamCtl stream,
char *  copybuf,
int  len,
XLogRecPtr blockpos 
)
static

Definition at line 1039 of file receivelog.c.

1041{
1042 int xlogoff;
1043 int bytes_left;
1044 int bytes_written;
1045 int hdr_len;
1046
1047 /*
1048 * Once we've decided we don't want to receive any more, just ignore any
1049 * subsequent XLogData messages.
1050 */
1051 if (!(still_sending))
1052 return true;
1053
1054 /*
1055 * Read the header of the XLogData message, enclosed in the CopyData
1056 * message. We only need the WAL location field (dataStart), the rest of
1057 * the header is ignored.
1058 */
1059 hdr_len = 1; /* msgtype 'w' */
1060 hdr_len += 8; /* dataStart */
1061 hdr_len += 8; /* walEnd */
1062 hdr_len += 8; /* sendTime */
1063 if (len < hdr_len)
1064 {
1065 pg_log_error("streaming header too small: %d", len);
1066 return false;
1067 }
1068 *blockpos = fe_recvint64(&copybuf[1]);
1069
1070 /* Extract WAL location for this block */
1071 xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1072
1073 /*
1074 * Verify that the initial location in the stream matches where we think
1075 * we are.
1076 */
1077 if (walfile == NULL)
1078 {
1079 /* No file open yet */
1080 if (xlogoff != 0)
1081 {
1082 pg_log_error("received write-ahead log record for offset %u with no file open",
1083 xlogoff);
1084 return false;
1085 }
1086 }
1087 else
1088 {
1089 /* More data in existing segment */
1090 if (walfile->currpos != xlogoff)
1091 {
1092 pg_log_error("got WAL data offset %08x, expected %08x",
1093 xlogoff, (int) walfile->currpos);
1094 return false;
1095 }
1096 }
1097
1098 bytes_left = len - hdr_len;
1099 bytes_written = 0;
1100
1101 while (bytes_left)
1102 {
1103 int bytes_to_write;
1104
1105 /*
1106 * If crossing a WAL boundary, only write up until we reach wal
1107 * segment size.
1108 */
1109 if (xlogoff + bytes_left > WalSegSz)
1110 bytes_to_write = WalSegSz - xlogoff;
1111 else
1112 bytes_to_write = bytes_left;
1113
1114 if (walfile == NULL)
1115 {
1116 if (!open_walfile(stream, *blockpos))
1117 {
1118 /* Error logged by open_walfile */
1119 return false;
1120 }
1121 }
1122
1123 if (stream->walmethod->ops->write(walfile,
1124 copybuf + hdr_len + bytes_written,
1125 bytes_to_write) != bytes_to_write)
1126 {
1127 pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1128 bytes_to_write, walfile->pathname,
1130 return false;
1131 }
1132
1133 /* Write was successful, advance our position */
1134 bytes_written += bytes_to_write;
1135 bytes_left -= bytes_to_write;
1136 *blockpos += bytes_to_write;
1137 xlogoff += bytes_to_write;
1138
1139 /* Did we reach the end of a WAL segment? */
1140 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1141 {
1142 if (!close_walfile(stream, *blockpos))
1143 /* Error message written in close_walfile() */
1144 return false;
1145
1146 xlogoff = 0;
1147
1148 if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1149 {
1150 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1151 {
1152 pg_log_error("could not send copy-end packet: %s",
1154 return false;
1155 }
1156 still_sending = false;
1157 return true; /* ignore the rest of this XLogData packet */
1158 }
1159 }
1160 }
1161 /* No more data left to write, receive next copy packet */
1162
1163 return true;
1164}
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
Definition: receivelog.c:89
int64 fe_recvint64(char *buf)
Definition: streamutil.c:932
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
Definition: walmethods.h:73
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

References close_walfile(), conn, copybuf, Walfile::currpos, fe_recvint64(), GetLastWalMethodError(), len, open_walfile(), WalWriteMethod::ops, Walfile::pathname, pg_log_error, PQerrorMessage(), PQflush(), PQputCopyEnd(), still_sending, StreamCtl::stream_stop, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, WalWriteMethodOps::write, and XLogSegmentOffset.

Referenced by HandleCopyStream().

◆ ReadEndOfStreamingResult()

static bool ReadEndOfStreamingResult ( PGresult res,
XLogRecPtr startpos,
uint32 timeline 
)
static

Definition at line 698 of file receivelog.c.

699{
700 uint32 startpos_xlogid,
701 startpos_xrecoff;
702
703 /*----------
704 * The result set consists of one row and two columns, e.g:
705 *
706 * next_tli | next_tli_startpos
707 * ----------+-------------------
708 * 4 | 0/9949AE0
709 *
710 * next_tli is the timeline ID of the next timeline after the one that
711 * just finished streaming. next_tli_startpos is the WAL location where
712 * the server switched to it.
713 *----------
714 */
715 if (PQnfields(res) < 2 || PQntuples(res) != 1)
716 {
717 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
718 PQntuples(res), PQnfields(res), 1, 2);
719 return false;
720 }
721
722 *timeline = atoi(PQgetvalue(res, 0, 0));
723 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724 &startpos_xrecoff) != 2)
725 {
726 pg_log_error("could not parse next timeline's starting point \"%s\"",
727 PQgetvalue(res, 0, 1));
728 return false;
729 }
730 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731
732 return true;
733}
uint64_t uint64
Definition: c.h:486
uint32_t uint32
Definition: c.h:485
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
static XLogRecPtr startpos

References pg_log_error, PQgetvalue(), PQnfields(), PQntuples(), res, and startpos.

Referenced by ReceiveXlogStream().

◆ ReceiveXlogStream()

bool ReceiveXlogStream ( PGconn conn,
StreamCtl stream 
)

Definition at line 452 of file receivelog.c.

453{
454 char query[128];
455 char slotcmd[128];
456 PGresult *res;
457 XLogRecPtr stoppos;
458
459 /*
460 * The caller should've checked the server version already, but doesn't do
461 * any harm to check it here too.
462 */
464 return false;
465
466 /*
467 * Decide whether we want to report the flush position. If we report the
468 * flush position, the primary will know what WAL we'll possibly
469 * re-request, and it can then remove older WAL safely. We must always do
470 * that when we are using slots.
471 *
472 * Reporting the flush position makes one eligible as a synchronous
473 * replica. People shouldn't include generic names in
474 * synchronous_standby_names, but we've protected them against it so far,
475 * so let's continue to do so unless specifically requested.
476 */
477 if (stream->replication_slot != NULL)
478 {
479 reportFlushPosition = true;
480 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
481 }
482 else
483 {
484 if (stream->synchronous)
485 reportFlushPosition = true;
486 else
487 reportFlushPosition = false;
488 slotcmd[0] = 0;
489 }
490
491 if (stream->sysidentifier != NULL)
492 {
493 char *sysidentifier = NULL;
494 TimeLineID servertli;
495
496 /*
497 * Get the server system identifier and timeline, and validate them.
498 */
499 if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
500 {
501 pg_free(sysidentifier);
502 return false;
503 }
504
505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506 {
507 pg_log_error("system identifier does not match between base backup and streaming connection");
508 pg_free(sysidentifier);
509 return false;
510 }
511 pg_free(sysidentifier);
512
513 if (stream->timeline > servertli)
514 {
515 pg_log_error("starting timeline %u is not present in the server",
516 stream->timeline);
517 return false;
518 }
519 }
520
521 /*
522 * initialize flush position to starting point, it's the caller's
523 * responsibility that that's sane.
524 */
525 lastFlushPosition = stream->startpos;
526
527 while (1)
528 {
529 /*
530 * Fetch the timeline history file for this timeline, if we don't have
531 * it already. When streaming log to tar, this will always return
532 * false, as we are never streaming into an existing file and
533 * therefore there can be no pre-existing timeline history file.
534 */
535 if (!existsTimeLineHistoryFile(stream))
536 {
537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
538 res = PQexec(conn, query);
540 {
541 /* FIXME: we might send it ok, but get an error */
542 pg_log_error("could not send replication command \"%s\": %s",
543 "TIMELINE_HISTORY", PQresultErrorMessage(res));
544 PQclear(res);
545 return false;
546 }
547
548 /*
549 * The response to TIMELINE_HISTORY is a single row result set
550 * with two fields: filename and content
551 */
552 if (PQnfields(res) != 2 || PQntuples(res) != 1)
553 {
554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
555 PQntuples(res), PQnfields(res), 1, 2);
556 }
557
558 /* Write the history file to disk */
560 PQgetvalue(res, 0, 0),
561 PQgetvalue(res, 0, 1));
562
563 PQclear(res);
564 }
565
566 /*
567 * Before we start streaming from the requested location, check if the
568 * callback tells us to stop here.
569 */
570 if (stream->stream_stop(stream->startpos, stream->timeline, false))
571 return true;
572
573 /* Initiate the replication stream at specified location */
574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575 slotcmd,
576 LSN_FORMAT_ARGS(stream->startpos),
577 stream->timeline);
578 res = PQexec(conn, query);
580 {
581 pg_log_error("could not send replication command \"%s\": %s",
582 "START_REPLICATION", PQresultErrorMessage(res));
583 PQclear(res);
584 return false;
585 }
586 PQclear(res);
587
588 /* Stream the WAL */
589 res = HandleCopyStream(conn, stream, &stoppos);
590 if (res == NULL)
591 goto error;
592
593 /*
594 * Streaming finished.
595 *
596 * There are two possible reasons for that: a controlled shutdown, or
597 * we reached the end of the current timeline. In case of
598 * end-of-timeline, the server sends a result set after Copy has
599 * finished, containing information about the next timeline. Read
600 * that, and restart streaming from the next timeline. In case of
601 * controlled shutdown, stop here.
602 */
604 {
605 /*
606 * End-of-timeline. Read the next timeline's ID and starting
607 * position. Usually, the starting position will match the end of
608 * the previous timeline, but there are corner cases like if the
609 * server had sent us half of a WAL record, when it was promoted.
610 * The new timeline will begin at the end of the last complete
611 * record in that case, overlapping the partial WAL record on the
612 * old timeline.
613 */
614 uint32 newtimeline;
615 bool parsed;
616
617 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
618 PQclear(res);
619 if (!parsed)
620 goto error;
621
622 /* Sanity check the values the server gave us */
623 if (newtimeline <= stream->timeline)
624 {
625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626 newtimeline, stream->timeline);
627 goto error;
628 }
629 if (stream->startpos > stoppos)
630 {
631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
632 stream->timeline, LSN_FORMAT_ARGS(stoppos),
633 newtimeline, LSN_FORMAT_ARGS(stream->startpos));
634 goto error;
635 }
636
637 /* Read the final result, which should be CommandComplete. */
640 {
641 pg_log_error("unexpected termination of replication stream: %s",
643 PQclear(res);
644 goto error;
645 }
646 PQclear(res);
647
648 /*
649 * Loop back to start streaming from the new timeline. Always
650 * start streaming at the beginning of a segment.
651 */
652 stream->timeline = newtimeline;
653 stream->startpos = stream->startpos -
655 continue;
656 }
658 {
659 PQclear(res);
660
661 /*
662 * End of replication (ie. controlled shut down of the server).
663 *
664 * Check if the callback thinks it's OK to stop here. If not,
665 * complain.
666 */
667 if (stream->stream_stop(stoppos, stream->timeline, false))
668 return true;
669 else
670 {
671 pg_log_error("replication stream was terminated before stop point");
672 goto error;
673 }
674 }
675 else
676 {
677 /* Server returned an error. */
678 pg_log_error("unexpected termination of replication stream: %s",
680 PQclear(res);
681 goto error;
682 }
683 }
684
685error:
686 if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
687 pg_log_error("could not close file \"%s\": %s",
689 walfile = NULL;
690 return false;
691}
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:132
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:744
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:257
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:274
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:374
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:698
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:478
char * sysidentifier
Definition: receivelog.h:33
char * replication_slot
Definition: receivelog.h:48
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint32 TimeLineID
Definition: xlogdefs.h:59

References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, res, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

◆ sendFeedback()

static bool sendFeedback ( PGconn conn,
XLogRecPtr  blockpos,
TimestampTz  now,
bool  replyRequested 
)
static

Definition at line 336 of file receivelog.c.

337{
338 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339 int len = 0;
340
341 replybuf[len] = 'r';
342 len += 1;
343 fe_sendint64(blockpos, &replybuf[len]); /* write */
344 len += 8;
346 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
347 else
348 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
349 len += 8;
350 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
351 len += 8;
352 fe_sendint64(now, &replybuf[len]); /* sendTime */
353 len += 8;
354 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
355 len += 1;
356
357 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
358 {
359 pg_log_error("could not send feedback packet: %s",
361 return false;
362 }
363
364 return true;
365}
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2695
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:921
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References conn, fe_sendint64(), InvalidXLogRecPtr, lastFlushPosition, len, now(), pg_log_error, PQerrorMessage(), PQflush(), PQputCopyData(), and reportFlushPosition.

Referenced by HandleCopyStream(), and ProcessKeepaliveMsg().

◆ writeTimeLineHistoryFile()

static bool writeTimeLineHistoryFile ( StreamCtl stream,
char *  filename,
char *  content 
)
static

Definition at line 274 of file receivelog.c.

275{
276 int size = strlen(content);
277 char histfname[MAXFNAMELEN];
278 Walfile *f;
279
280 /*
281 * Check that the server's idea of how timeline history files should be
282 * named matches ours.
283 */
284 TLHistoryFileName(histfname, stream->timeline);
285 if (strcmp(histfname, filename) != 0)
286 {
287 pg_log_error("server reported unexpected history file name for timeline %u: %s",
288 stream->timeline, filename);
289 return false;
290 }
291
292 f = stream->walmethod->ops->open_for_write(stream->walmethod,
293 histfname, ".tmp", 0);
294 if (f == NULL)
295 {
296 pg_log_error("could not create timeline history file \"%s\": %s",
297 histfname, GetLastWalMethodError(stream->walmethod));
298 return false;
299 }
300
301 if ((int) stream->walmethod->ops->write(f, content, size) != size)
302 {
303 pg_log_error("could not write timeline history file \"%s\": %s",
304 histfname, GetLastWalMethodError(stream->walmethod));
305
306 /*
307 * If we fail to make the file, delete it to release disk space
308 */
309 stream->walmethod->ops->close(f, CLOSE_UNLINK);
310
311 return false;
312 }
313
314 if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
315 {
316 pg_log_error("could not close file \"%s\": %s",
317 histfname, GetLastWalMethodError(stream->walmethod));
318 return false;
319 }
320
321 /* Maintain archive_status, check close_walfile() for details. */
322 if (stream->mark_done)
323 {
324 /* writes error message if failed */
325 if (!mark_file_as_archived(stream, histfname))
326 return false;
327 }
328
329 return true;
330}
static char * filename
Definition: pg_dumpall.c:119

References WalWriteMethodOps::close, CLOSE_NORMAL, CLOSE_UNLINK, filename, GetLastWalMethodError(), StreamCtl::mark_done, mark_file_as_archived(), MAXFNAMELEN, WalWriteMethodOps::open_for_write, WalWriteMethod::ops, pg_log_error, size, StreamCtl::timeline, TLHistoryFileName(), StreamCtl::walmethod, and WalWriteMethodOps::write.

Referenced by ReceiveXlogStream().

Variable Documentation

◆ lastFlushPosition

XLogRecPtr lastFlushPosition = InvalidXLogRecPtr
static

◆ reportFlushPosition

bool reportFlushPosition = false
static

Definition at line 29 of file receivelog.c.

Referenced by ProcessKeepaliveMsg(), ReceiveXlogStream(), and sendFeedback().

◆ still_sending

◆ walfile