58 snprintf(tmppath,
sizeof(tmppath),
"archive_status/%s.done",
65 pg_log_error(
"could not create archive status file \"%s\": %s",
72 pg_log_error(
"could not close archive status file \"%s\": %s",
121 pg_log_error(
"could not get size of write-ahead log file \"%s\": %s",
132 pg_log_error(
"could not open existing write-ahead log file \"%s\": %s",
141 pg_log_error(
"could not fsync existing write-ahead log file \"%s\": %s",
157 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
174 pg_log_error(
"could not open write-ahead log file \"%s\": %s",
215 pg_log_info(
"not renaming \"%s\", segment is not complete",
fn);
276 int size = strlen(content);
285 if (strcmp(histfname,
filename) != 0)
287 pg_log_error(
"server reported unexpected history file name for timeline %u: %s",
293 histfname,
".tmp", 0);
296 pg_log_error(
"could not create timeline history file \"%s\": %s",
303 pg_log_error(
"could not write timeline history file \"%s\": %s",
338 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
354 replybuf[
len] = replyRequested ? 1 : 0;
386 minServerMajor = 903;
387 maxServerMajor = PG_VERSION_NUM / 100;
389 if (serverMajor < minServerMajor)
393 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions older than %s",
394 serverver ? serverver :
"'unknown'",
398 else if (serverMajor > maxServerMajor)
402 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions newer than %s",
403 serverver ? serverver :
"'unknown'",
493 char *sysidentifier = NULL;
507 pg_log_error(
"system identifier does not match between base backup and streaming connection");
515 pg_log_error(
"starting timeline %u is not present in the server",
542 pg_log_error(
"could not send replication command \"%s\": %s",
554 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
574 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%X TIMELINE %u",
581 pg_log_error(
"could not send replication command \"%s\": %s",
623 if (newtimeline <= stream->timeline)
625 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
631 pg_log_error(
"server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
641 pg_log_error(
"unexpected termination of replication stream: %s",
671 pg_log_error(
"replication stream was terminated before stop point");
678 pg_log_error(
"unexpected termination of replication stream: %s",
717 pg_log_error(
"unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
724 &startpos_xrecoff) != 2)
726 pg_log_error(
"could not parse next timeline's starting point \"%s\"",
730 *
startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
774 pg_fatal(
"could not fsync file \"%s\": %s",
875 struct timeval timeout;
876 struct timeval *timeoutptr;
885 FD_ZERO(&input_mask);
886 FD_SET(connsocket, &input_mask);
890 FD_SET(stop_socket, &input_mask);
891 maxfd =
Max(maxfd, stop_socket);
898 timeout.tv_sec = timeout_ms / 1000L;
899 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
900 timeoutptr = &timeout;
903 ret =
select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
912 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
958 pg_log_error(
"could not receive data from WAL stream: %s",
1005 replyRequested =
copybuf[pos];
1021 pg_fatal(
"could not fsync file \"%s\": %s",
1082 pg_log_error(
"received write-ahead log record for offset %u with no file open",
1092 pg_log_error(
"got WAL data offset %08x, expected %08x",
1098 bytes_left =
len - hdr_len;
1109 if (xlogoff + bytes_left >
WalSegSz)
1110 bytes_to_write =
WalSegSz - xlogoff;
1112 bytes_to_write = bytes_left;
1124 copybuf + hdr_len + bytes_written,
1125 bytes_to_write) != bytes_to_write)
1127 pg_log_error(
"could not write %d bytes to WAL file \"%s\": %s",
1134 bytes_written += bytes_to_write;
1135 bytes_left -= bytes_to_write;
1136 *blockpos += bytes_to_write;
1137 xlogoff += bytes_to_write;
1202 *stoppos = blockpos;
1242 status_targettime = last_status +
1245 if (status_targettime > 0)
1261 sleeptime = secs * 1000 + usecs / 1000;
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
static void PGresult * res
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
int PQserverVersion(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQntuples(const PGresult *res)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
int PQnfields(const PGresult *res)
PGresult * PQgetResult(PGconn *conn)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
#define pg_log_error(...)
static int standby_message_timeout
static XLogRecPtr startpos
#define pg_log_warning(...)
size_t strlcpy(char *dst, const char *src, size_t siz)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
static bool reportFlushPosition
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool still_sending
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
bool CheckServerVersionForStreaming(PGconn *conn)
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
static XLogRecPtr lastFlushPosition
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
static pg_noinline void Size size
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
int standby_message_timeout
WalWriteMethod * walmethod
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
static StringInfo copybuf
static void * fn(void *arg)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define select(n, r, w, e, timeout)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr