59 snprintf(tmppath,
sizeof(tmppath),
"archive_status/%s.done",
66 pg_log_error(
"could not create archive status file \"%s\": %s",
73 pg_log_error(
"could not close archive status file \"%s\": %s",
122 pg_log_error(
"could not get size of write-ahead log file \"%s\": %s",
133 pg_log_error(
"could not open existing write-ahead log file \"%s\": %s",
142 pg_log_error(
"could not fsync existing write-ahead log file \"%s\": %s",
158 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
175 pg_log_error(
"could not open write-ahead log file \"%s\": %s",
216 pg_log_info(
"not renaming \"%s\", segment is not complete",
fn);
277 int size = strlen(content);
286 if (strcmp(histfname,
filename) != 0)
288 pg_log_error(
"server reported unexpected history file name for timeline %u: %s",
294 histfname,
".tmp", 0);
297 pg_log_error(
"could not create timeline history file \"%s\": %s",
304 pg_log_error(
"could not write timeline history file \"%s\": %s",
339 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
355 replybuf[
len] = replyRequested ? 1 : 0;
387 minServerMajor = 903;
388 maxServerMajor = PG_VERSION_NUM / 100;
390 if (serverMajor < minServerMajor)
394 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions older than %s",
395 serverver ? serverver :
"'unknown'",
399 else if (serverMajor > maxServerMajor)
403 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions newer than %s",
404 serverver ? serverver :
"'unknown'",
494 char *sysidentifier = NULL;
508 pg_log_error(
"system identifier does not match between base backup and streaming connection");
516 pg_log_error(
"starting timeline %u is not present in the server",
543 pg_log_error(
"could not send replication command \"%s\": %s",
555 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
575 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%X TIMELINE %u",
582 pg_log_error(
"could not send replication command \"%s\": %s",
624 if (newtimeline <= stream->timeline)
626 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
632 pg_log_error(
"server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
642 pg_log_error(
"unexpected termination of replication stream: %s",
672 pg_log_error(
"replication stream was terminated before stop point");
679 pg_log_error(
"unexpected termination of replication stream: %s",
718 pg_log_error(
"unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
725 &startpos_xrecoff) != 2)
727 pg_log_error(
"could not parse next timeline's starting point \"%s\"",
731 *
startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
775 pg_fatal(
"could not fsync file \"%s\": %s",
876 struct timeval timeout;
877 struct timeval *timeoutptr;
886 FD_ZERO(&input_mask);
887 FD_SET(connsocket, &input_mask);
891 FD_SET(stop_socket, &input_mask);
892 maxfd =
Max(maxfd, stop_socket);
899 timeout.tv_sec = timeout_ms / 1000L;
900 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
901 timeoutptr = &timeout;
904 ret =
select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
913 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
959 pg_log_error(
"could not receive data from WAL stream: %s",
1006 replyRequested =
copybuf[pos];
1022 pg_fatal(
"could not fsync file \"%s\": %s",
1083 pg_log_error(
"received write-ahead log record for offset %u with no file open",
1093 pg_log_error(
"got WAL data offset %08x, expected %08x",
1099 bytes_left =
len - hdr_len;
1110 if (xlogoff + bytes_left >
WalSegSz)
1111 bytes_to_write =
WalSegSz - xlogoff;
1113 bytes_to_write = bytes_left;
1125 copybuf + hdr_len + bytes_written,
1126 bytes_to_write) != bytes_to_write)
1128 pg_log_error(
"could not write %d bytes to WAL file \"%s\": %s",
1135 bytes_written += bytes_to_write;
1136 bytes_left -= bytes_to_write;
1137 *blockpos += bytes_to_write;
1138 xlogoff += bytes_to_write;
1203 *stoppos = blockpos;
1243 status_targettime = last_status +
1246 if (status_targettime > 0)
1262 sleeptime = secs * 1000 + usecs / 1000;
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
static void PGresult * res
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
int PQserverVersion(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQntuples(const PGresult *res)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
int PQnfields(const PGresult *res)
PGresult * PQgetResult(PGconn *conn)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
#define pg_log_error(...)
static int standby_message_timeout
static XLogRecPtr startpos
#define pg_log_warning(...)
size_t strlcpy(char *dst, const char *src, size_t siz)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
static bool reportFlushPosition
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool still_sending
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
bool CheckServerVersionForStreaming(PGconn *conn)
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
static XLogRecPtr lastFlushPosition
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
int standby_message_timeout
WalWriteMethod * walmethod
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
static StringInfo copybuf
static void * fn(void *arg)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define select(n, r, w, e, timeout)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr