PostgreSQL Source Code git master
Loading...
Searching...
No Matches
streamutil.c File Reference
#include "postgres_fe.h"
#include <sys/time.h>
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/connect.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#include "datatype/timestamp.h"
#include "port/pg_bswap.h"
#include "pqexpbuffer.h"
#include "streamutil.h"
Include dependency graph for streamutil.c:

Go to the source code of this file.

Macros

#define ERRCODE_DUPLICATE_OBJECT   "42710"
 
#define MINIMUM_VERSION_FOR_SHOW_CMD   100000
 
#define MINIMUM_VERSION_FOR_GROUP_ACCESS   110000
 

Functions

static bool RetrieveDataDirCreatePerm (PGconn *conn)
 
PGconnGetConnection (void)
 
bool RetrieveWalSegSize (PGconn *conn)
 
bool RunIdentifySystem (PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
 
bool GetSlotInformation (PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
 
bool CreateReplicationSlot (PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, bool slot_exists_ok, bool two_phase, bool failover)
 
bool DropReplicationSlot (PGconn *conn, const char *slot_name)
 
void AppendQuotedString (PQExpBuffer buf, const char *str, char quote)
 
void AppendPlainCommandOption (PQExpBuffer buf, bool use_new_option_syntax, const char *option_name)
 
void AppendStringCommandOption (PQExpBuffer buf, bool use_new_option_syntax, const char *option_name, const char *option_value)
 
void AppendIntegerCommandOption (PQExpBuffer buf, bool use_new_option_syntax, const char *option_name, int32 option_value)
 
TimestampTz feGetCurrentTimestamp (void)
 
void feTimestampDifference (TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
 
bool feTimestampDifferenceExceeds (TimestampTz start_time, TimestampTz stop_time, int msec)
 
void fe_sendint64 (int64 i, char *buf)
 
int64 fe_recvint64 (char *buf)
 

Variables

int WalSegSz
 
const charprogname
 
charconnection_string = NULL
 
chardbhost = NULL
 
chardbuser = NULL
 
chardbport = NULL
 
chardbname = NULL
 
int dbgetpassword = 0
 
static charpassword = NULL
 
PGconnconn = NULL
 

Macro Definition Documentation

◆ ERRCODE_DUPLICATE_OBJECT

#define ERRCODE_DUPLICATE_OBJECT   "42710"

Definition at line 30 of file streamutil.c.

◆ MINIMUM_VERSION_FOR_GROUP_ACCESS

#define MINIMUM_VERSION_FOR_GROUP_ACCESS   110000

Definition at line 42 of file streamutil.c.

◆ MINIMUM_VERSION_FOR_SHOW_CMD

#define MINIMUM_VERSION_FOR_SHOW_CMD   100000

Definition at line 37 of file streamutil.c.

Function Documentation

◆ AppendIntegerCommandOption()

void AppendIntegerCommandOption ( PQExpBuffer  buf,
bool  use_new_option_syntax,
const char option_name,
int32  option_value 
)

Definition at line 817 of file streamutil.c.

819{
821
823}
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
static int fb(int x)
void AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax, const char *option_name)
Definition streamutil.c:777

References AppendPlainCommandOption(), appendPQExpBuffer(), buf, and fb().

Referenced by BaseBackup().

◆ AppendPlainCommandOption()

void AppendPlainCommandOption ( PQExpBuffer  buf,
bool  use_new_option_syntax,
const char option_name 
)

Definition at line 777 of file streamutil.c.

779{
780 if (buf->len > 0 && buf->data[buf->len - 1] != '(')
781 {
784 else
786 }
787
788 appendPQExpBuffer(buf, " %s", option_name);
789}
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and fb().

Referenced by AppendIntegerCommandOption(), AppendStringCommandOption(), BaseBackup(), and CreateReplicationSlot().

◆ AppendQuotedString()

void AppendQuotedString ( PQExpBuffer  buf,
const char str,
char  quote 
)

Definition at line 750 of file streamutil.c.

751{
753 while (*str)
754 {
755 char c = *str++;
756
757 if (c == quote)
760 }
762}
const char * str
char * c

References appendPQExpBufferChar(), buf, and str.

◆ AppendStringCommandOption()

void AppendStringCommandOption ( PQExpBuffer  buf,
bool  use_new_option_syntax,
const char option_name,
const char option_value 
)

Definition at line 798 of file streamutil.c.

800{
802
803 if (option_value != NULL)
804 {
807 }
808}
#define AppendQuotedLiteral(b, s)
Definition streamutil.h:48

References AppendPlainCommandOption(), appendPQExpBufferChar(), AppendQuotedLiteral, buf, and fb().

Referenced by BaseBackup(), and CreateReplicationSlot().

◆ CreateReplicationSlot()

bool CreateReplicationSlot ( PGconn conn,
const char slot_name,
const char plugin,
bool  is_temporary,
bool  is_physical,
bool  reserve_wal,
bool  slot_exists_ok,
bool  two_phase,
bool  failover 
)

Definition at line 585 of file streamutil.c.

588{
589 PQExpBuffer query;
590 PGresult *res;
591 bool use_new_option_syntax = (PQserverVersion(conn) >= 150000);
592
593 query = createPQExpBuffer();
594
595 Assert((is_physical && plugin == NULL) ||
596 (!is_physical && plugin != NULL));
599 Assert(slot_name != NULL);
600
601 /* Build base portion of query */
602 appendPQExpBufferStr(query, "CREATE_REPLICATION_SLOT ");
603 AppendQuotedIdentifier(query, slot_name);
604 if (is_temporary)
605 appendPQExpBufferStr(query, " TEMPORARY");
606 if (is_physical)
607 appendPQExpBufferStr(query, " PHYSICAL");
608 else
609 {
610 appendPQExpBufferStr(query, " LOGICAL ");
612 }
613
614 /* Add any requested options */
616 appendPQExpBufferStr(query, " (");
617 if (is_physical)
618 {
619 if (reserve_wal)
621 "RESERVE_WAL");
622 }
623 else
624 {
625 if (failover && PQserverVersion(conn) >= 170000)
627 "FAILOVER");
628
629 if (two_phase && PQserverVersion(conn) >= 150000)
631 "TWO_PHASE");
632
633 if (PQserverVersion(conn) >= 100000)
634 {
635 /* pg_recvlogical doesn't use an exported snapshot, so suppress */
638 "SNAPSHOT", "nothing");
639 else
641 "NOEXPORT_SNAPSHOT");
642 }
643 }
645 {
646 /* Suppress option list if it would be empty, otherwise terminate */
647 if (query->data[query->len - 1] == '(')
648 {
649 query->len -= 2;
650 query->data[query->len] = '\0';
651 }
652 else
653 appendPQExpBufferChar(query, ')');
654 }
655
656 /* Now run the query */
657 res = PQexec(conn, query->data);
659 {
660 const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
661
662 if (slot_exists_ok &&
663 sqlstate &&
664 strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
665 {
666 destroyPQExpBuffer(query);
667 PQclear(res);
668 return true;
669 }
670 else
671 {
672 pg_log_error("could not send replication command \"%s\": %s",
673 query->data, PQerrorMessage(conn));
674
675 destroyPQExpBuffer(query);
676 PQclear(res);
677 return false;
678 }
679 }
680
681 if (PQntuples(res) != 1 || PQnfields(res) != 4)
682 {
683 pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
684 slot_name,
685 PQntuples(res), PQnfields(res), 1, 4);
686
687 destroyPQExpBuffer(query);
688 PQclear(res);
689 return false;
690 }
691
692 destroyPQExpBuffer(query);
693 PQclear(res);
694 return true;
695}
#define Assert(condition)
Definition c.h:943
int PQserverVersion(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
#define PQclear
#define PQresultErrorField
#define PQnfields
#define PQresultStatus
#define PQntuples
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
#define pg_log_error(...)
Definition logging.h:108
static bool slot_exists_ok
static bool two_phase
static bool failover
static const char * plugin
#define PG_DIAG_SQLSTATE
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void destroyPQExpBuffer(PQExpBuffer str)
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
void AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax, const char *option_name, const char *option_value)
Definition streamutil.c:798
PGconn * conn
Definition streamutil.c:52
#define AppendQuotedIdentifier(b, s)
Definition streamutil.h:47

References AppendPlainCommandOption(), appendPQExpBufferChar(), appendPQExpBufferStr(), AppendQuotedIdentifier, AppendStringCommandOption(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), ERRCODE_DUPLICATE_OBJECT, failover, fb(), PQExpBufferData::len, PG_DIAG_SQLSTATE, pg_log_error, PGRES_TUPLES_OK, plugin, PQclear, PQerrorMessage(), PQexec(), PQnfields, PQntuples, PQresultErrorField, PQresultStatus, PQserverVersion(), slot_exists_ok, and two_phase.

◆ DropReplicationSlot()

bool DropReplicationSlot ( PGconn conn,
const char slot_name 
)

Definition at line 702 of file streamutil.c.

703{
704 PQExpBuffer query;
705 PGresult *res;
706
707 Assert(slot_name != NULL);
708
709 query = createPQExpBuffer();
710
711 /* Build query */
712 appendPQExpBufferStr(query, "DROP_REPLICATION_SLOT ");
713 AppendQuotedIdentifier(query, slot_name);
714 res = PQexec(conn, query->data);
716 {
717 pg_log_error("could not send replication command \"%s\": %s",
718 query->data, PQerrorMessage(conn));
719
720 destroyPQExpBuffer(query);
721 PQclear(res);
722 return false;
723 }
724
725 if (PQntuples(res) != 0 || PQnfields(res) != 0)
726 {
727 pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
728 slot_name,
729 PQntuples(res), PQnfields(res), 0, 0);
730
731 destroyPQExpBuffer(query);
732 PQclear(res);
733 return false;
734 }
735
736 destroyPQExpBuffer(query);
737 PQclear(res);
738 return true;
739}
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131

References appendPQExpBufferStr(), AppendQuotedIdentifier, Assert, conn, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), fb(), pg_log_error, PGRES_COMMAND_OK, PQclear, PQerrorMessage(), PQexec(), PQnfields, PQntuples, and PQresultStatus.

◆ fe_recvint64()

int64 fe_recvint64 ( char buf)

Definition at line 895 of file streamutil.c.

896{
897 uint64 n64;
898
899 memcpy(&n64, buf, sizeof(n64));
900
901 return pg_ntoh64(n64);
902}
uint64_t uint64
Definition c.h:625
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
#define pg_ntoh64(x)
Definition pg_bswap.h:126

References buf, fb(), memcpy(), and pg_ntoh64.

Referenced by ProcessWALDataMsg(), and StreamLogicalLog().

◆ fe_sendint64()

void fe_sendint64 ( int64  i,
char buf 
)

Definition at line 884 of file streamutil.c.

885{
887
888 memcpy(buf, &n64, sizeof(n64));
889}
int i
Definition isn.c:77
#define pg_hton64(x)
Definition pg_bswap.h:122

References buf, fb(), i, memcpy(), and pg_hton64.

Referenced by sendFeedback(), and sendFeedback().

◆ feGetCurrentTimestamp()

TimestampTz feGetCurrentTimestamp ( void  )

Definition at line 830 of file streamutil.c.

831{
833 struct timeval tp;
834
835 gettimeofday(&tp, NULL);
836
837 result = (TimestampTz) tp.tv_sec -
839 result = (result * USECS_PER_SEC) + tp.tv_usec;
840
841 return result;
842}
uint32 result
int64 TimestampTz
Definition timestamp.h:39
#define USECS_PER_SEC
Definition timestamp.h:134
#define UNIX_EPOCH_JDATE
Definition timestamp.h:234
#define SECS_PER_DAY
Definition timestamp.h:126
#define POSTGRES_EPOCH_JDATE
Definition timestamp.h:235
int gettimeofday(struct timeval *tp, void *tzp)

References fb(), gettimeofday(), POSTGRES_EPOCH_JDATE, result, SECS_PER_DAY, UNIX_EPOCH_JDATE, and USECS_PER_SEC.

Referenced by flushAndSendFeedback(), HandleCopyStream(), main(), ProcessKeepaliveMsg(), and StreamLogicalLog().

◆ feTimestampDifference()

void feTimestampDifference ( TimestampTz  start_time,
TimestampTz  stop_time,
long secs,
int microsecs 
)

Definition at line 849 of file streamutil.c.

851{
853
854 if (diff <= 0)
855 {
856 *secs = 0;
857 *microsecs = 0;
858 }
859 else
860 {
861 *secs = (long) (diff / USECS_PER_SEC);
863 }
864}
static time_t start_time
Definition pg_ctl.c:96

References fb(), start_time, and USECS_PER_SEC.

Referenced by CalculateCopyStreamSleeptime(), and StreamLogicalLog().

◆ feTimestampDifferenceExceeds()

bool feTimestampDifferenceExceeds ( TimestampTz  start_time,
TimestampTz  stop_time,
int  msec 
)

Definition at line 871 of file streamutil.c.

874{
876
877 return (diff >= msec * INT64CONST(1000));
878}
#define INT64CONST(x)
Definition c.h:630

References fb(), INT64CONST, and start_time.

Referenced by HandleCopyStream(), and StreamLogicalLog().

◆ GetConnection()

PGconn * GetConnection ( void  )

Definition at line 60 of file streamutil.c.

61{
63 int argcount = 7; /* dbname, replication, fallback_app_name,
64 * host, user, port, password */
65 int i;
66 const char **keywords;
67 const char **values;
68 const char *tmpparam;
69 bool need_password;
72 char *err_msg = NULL;
73
74 /*
75 * pg_recvlogical uses dbname only; others use connection_string only.
76 * (Note: both variables will be NULL if there's no command line options.)
77 */
79
80 /*
81 * Merge the connection info inputs given in form of connection string,
82 * options and default values (dbname=replication, replication=true, etc.)
83 */
84 i = 0;
86 {
88 if (conn_opts == NULL)
89 pg_fatal("%s", err_msg);
90
91 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
92 {
93 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
94 argcount++;
95 }
96
97 keywords = pg_malloc0_array(const char *, argcount + 1);
98 values = pg_malloc0_array(const char *, argcount + 1);
99
100 /*
101 * Set dbname here already, so it can be overridden by a dbname in the
102 * connection string.
103 */
104 keywords[i] = "dbname";
105 values[i] = "replication";
106 i++;
107
108 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
109 {
110 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
111 {
113 values[i] = conn_opt->val;
114 i++;
115 }
116 }
117 }
118 else
119 {
120 keywords = pg_malloc0_array(const char *, argcount + 1);
121 values = pg_malloc0_array(const char *, argcount + 1);
122 keywords[i] = "dbname";
123 values[i] = (dbname == NULL) ? "replication" : dbname;
124 i++;
125 }
126
127 keywords[i] = "replication";
128 values[i] = (dbname == NULL) ? "true" : "database";
129 i++;
130 keywords[i] = "fallback_application_name";
131 values[i] = progname;
132 i++;
133
134 if (dbhost)
135 {
136 keywords[i] = "host";
137 values[i] = dbhost;
138 i++;
139 }
140 if (dbuser)
141 {
142 keywords[i] = "user";
143 values[i] = dbuser;
144 i++;
145 }
146 if (dbport)
147 {
148 keywords[i] = "port";
149 values[i] = dbport;
150 i++;
151 }
152
153 /* If -W was given, force prompt for password, but only the first time */
155
156 do
157 {
158 /* Get a new password if appropriate */
159 if (need_password)
160 {
161 free(password);
162 password = simple_prompt("Password: ", false);
163 need_password = false;
164 }
165
166 /* Use (or reuse, on a subsequent connection) password if we have it */
167 if (password)
168 {
169 keywords[i] = "password";
170 values[i] = password;
171 }
172 else
173 {
174 keywords[i] = NULL;
175 values[i] = NULL;
176 }
177
178 /*
179 * Only expand dbname when we did not already parse the argument as a
180 * connection string ourselves.
181 */
183
184 /*
185 * If there is too little memory even to allocate the PGconn object
186 * and PQconnectdbParams returns NULL, we call exit(1) directly.
187 */
188 if (!tmpconn)
189 pg_fatal("could not connect to server");
190
191 /* If we need a password and -w wasn't given, loop back and get one */
194 dbgetpassword != -1)
195 {
197 need_password = true;
198 }
199 }
200 while (need_password);
201
203 {
206 free(values);
207 free(keywords);
209 return NULL;
210 }
211
212 /* Connection ok! */
213 free(values);
214 free(keywords);
216
217 /*
218 * Set always-secure search path, so malicious users can't get control.
219 * The capacity to run normal SQL queries was added in PostgreSQL 10, so
220 * the search path cannot be changed (by us or attackers) on earlier
221 * versions.
222 */
223 if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
224 {
225 PGresult *res;
226
229 {
230 pg_log_error("could not clear \"search_path\": %s",
232 PQclear(res);
234 exit(1);
235 }
236 PQclear(res);
237 }
238
239 /*
240 * Ensure we have the same value of integer_datetimes (now always "on") as
241 * the server we are connecting to.
242 */
243 tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
244 if (!tmpparam)
245 {
246 pg_log_error("could not determine server setting for \"integer_datetimes\"");
248 exit(1);
249 }
250
251 if (strcmp(tmpparam, "on") != 0)
252 {
253 pg_log_error("\"integer_datetimes\" compile flag does not match server");
255 exit(1);
256 }
257
258 /*
259 * Retrieve the source data directory mode and use it to construct a umask
260 * for creating directories and files.
261 */
263 {
265 exit(1);
266 }
267
268 return tmpconn;
269}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
int PQconnectionNeedsPassword(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition fe-connect.c:775
#define pg_malloc0_array(type, count)
Definition fe_memutils.h:67
static const JsonPathKeyword keywords[]
@ CONNECTION_BAD
Definition libpq-fe.h:91
@ CONNECTION_OK
Definition libpq-fe.h:90
#define pg_fatal(...)
#define free(a)
char * simple_prompt(const char *prompt, bool echo)
Definition sprompt.c:38
int dbgetpassword
Definition streamutil.c:50
char * dbhost
Definition streamutil.c:46
static char * password
Definition streamutil.c:51
char * dbport
Definition streamutil.c:48
char * connection_string
Definition streamutil.c:45
const char * progname
Definition streamutil.c:44
static bool RetrieveDataDirCreatePerm(PGconn *conn)
Definition streamutil.c:355
char * dbname
Definition streamutil.c:49
char * dbuser
Definition streamutil.c:47
const char * keyword

References ALWAYS_SECURE_SEARCH_PATH_SQL, Assert, CONNECTION_BAD, CONNECTION_OK, connection_string, dbgetpassword, dbhost, dbname, dbport, dbuser, fb(), free, i, JsonPathKeyword::keyword, keywords, password, pg_fatal, pg_log_error, pg_malloc0_array, PGRES_TUPLES_OK, PQclear, PQconnectdbParams(), PQconnectionNeedsPassword(), PQconninfoFree(), PQconninfoParse(), PQerrorMessage(), PQexec(), PQfinish(), PQparameterStatus(), PQresultStatus, PQserverVersion(), PQstatus(), progname, RetrieveDataDirCreatePerm(), simple_prompt(), and values.

Referenced by create_foreign_modify(), dumpDatabase(), dumpDatabaseConfig(), dumpLOs(), dumpTableData_copy(), estimate_path_cost_size(), expand_extension_name_patterns(), expand_foreign_server_name_patterns(), expand_schema_name_patterns(), expand_table_name_patterns(), fetch_remote_statistics(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginDirectModify(), postgresBeginForeignScan(), postgresExecForeignTruncate(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), setup_connection(), StartLogStreamer(), StreamLog(), and StreamLogicalLog().

◆ GetSlotInformation()

bool GetSlotInformation ( PGconn conn,
const char slot_name,
XLogRecPtr restart_lsn,
TimeLineID restart_tli 
)

Definition at line 490 of file streamutil.c.

492{
493 PGresult *res;
494 PQExpBuffer query;
497
498 if (restart_lsn)
499 *restart_lsn = lsn_loc;
500 if (restart_tli)
502
503 query = createPQExpBuffer();
504 appendPQExpBufferStr(query, "READ_REPLICATION_SLOT ");
505 AppendQuotedIdentifier(query, slot_name);
506 res = PQexec(conn, query->data);
507 destroyPQExpBuffer(query);
508
510 {
511 pg_log_error("could not send replication command \"%s\": %s",
512 "READ_REPLICATION_SLOT", PQerrorMessage(conn));
513 PQclear(res);
514 return false;
515 }
516
517 /* The command should always return precisely one tuple and three fields */
518 if (PQntuples(res) != 1 || PQnfields(res) != 3)
519 {
520 pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
521 slot_name, PQntuples(res), PQnfields(res), 1, 3);
522 PQclear(res);
523 return false;
524 }
525
526 /*
527 * When the slot doesn't exist, the command returns a tuple with NULL
528 * values. This checks only the slot type field.
529 */
530 if (PQgetisnull(res, 0, 0))
531 {
532 pg_log_error("replication slot \"%s\" does not exist", slot_name);
533 PQclear(res);
534 return false;
535 }
536
537 /*
538 * Note that this cannot happen as READ_REPLICATION_SLOT supports only
539 * physical slots, but play it safe.
540 */
541 if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
542 {
543 pg_log_error("expected a physical replication slot, got type \"%s\" instead",
544 PQgetvalue(res, 0, 0));
545 PQclear(res);
546 return false;
547 }
548
549 /* restart LSN */
550 if (!PQgetisnull(res, 0, 1))
551 {
552 uint32 hi,
553 lo;
554
555 if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &hi, &lo) != 2)
556 {
557 pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
558 PQgetvalue(res, 0, 1), slot_name);
559 PQclear(res);
560 return false;
561 }
562 lsn_loc = ((uint64) hi) << 32 | lo;
563 }
564
565 /* current TLI */
566 if (!PQgetisnull(res, 0, 2))
567 tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
568
569 PQclear(res);
570
571 /* Assign results if requested */
572 if (restart_lsn)
573 *restart_lsn = lsn_loc;
574 if (restart_tli)
576
577 return true;
578}
uint32_t uint32
Definition c.h:624
#define PQgetvalue
#define PQgetisnull
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63

References appendPQExpBufferStr(), AppendQuotedIdentifier, conn, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), fb(), InvalidXLogRecPtr, pg_log_error, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQexec(), PQgetisnull, PQgetvalue, PQnfields, PQntuples, and PQresultStatus.

Referenced by StreamLog().

◆ RetrieveDataDirCreatePerm()

static bool RetrieveDataDirCreatePerm ( PGconn conn)
static

Definition at line 355 of file streamutil.c.

356{
357 PGresult *res;
359
360 /* check connection existence */
361 Assert(conn != NULL);
362
363 /* for previous versions leave the default group access */
365 return true;
366
367 res = PQexec(conn, "SHOW data_directory_mode");
369 {
370 pg_log_error("could not send replication command \"%s\": %s",
371 "SHOW data_directory_mode", PQerrorMessage(conn));
372
373 PQclear(res);
374 return false;
375 }
376 if (PQntuples(res) != 1 || PQnfields(res) < 1)
377 {
378 pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
379 PQntuples(res), PQnfields(res), 1, 1);
380
381 PQclear(res);
382 return false;
383 }
384
385 if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
386 {
387 pg_log_error("group access flag could not be parsed: %s",
388 PQgetvalue(res, 0, 0));
389
390 PQclear(res);
391 return false;
392 }
393
395
396 PQclear(res);
397 return true;
398}
void SetDataDirectoryCreatePerm(int dataDirMode)
Definition file_perm.c:34
int data_directory_mode
Definition globals.c:79
#define MINIMUM_VERSION_FOR_GROUP_ACCESS
Definition streamutil.c:42

References Assert, conn, data_directory_mode, fb(), MINIMUM_VERSION_FOR_GROUP_ACCESS, pg_log_error, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQexec(), PQgetvalue, PQnfields, PQntuples, PQresultStatus, PQserverVersion(), and SetDataDirectoryCreatePerm().

Referenced by GetConnection().

◆ RetrieveWalSegSize()

bool RetrieveWalSegSize ( PGconn conn)

Definition at line 276 of file streamutil.c.

277{
278 PGresult *res;
279 char xlog_unit[3];
280 int xlog_val,
281 multiplier = 1;
282
283 /* check connection existence */
284 Assert(conn != NULL);
285
286 /* for previous versions set the default xlog seg size */
288 {
290 return true;
291 }
292
293 res = PQexec(conn, "SHOW wal_segment_size");
295 {
296 pg_log_error("could not send replication command \"%s\": %s",
297 "SHOW wal_segment_size", PQerrorMessage(conn));
298
299 PQclear(res);
300 return false;
301 }
302 if (PQntuples(res) != 1 || PQnfields(res) < 1)
303 {
304 pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
305 PQntuples(res), PQnfields(res), 1, 1);
306
307 PQclear(res);
308 return false;
309 }
310
311 /* fetch xlog value and unit from the result */
312 if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
313 {
314 pg_log_error("WAL segment size could not be parsed");
315 PQclear(res);
316 return false;
317 }
318
319 PQclear(res);
320
321 /* set the multiplier based on unit to convert xlog_val to bytes */
322 if (strcmp(xlog_unit, "MB") == 0)
323 multiplier = 1024 * 1024;
324 else if (strcmp(xlog_unit, "GB") == 0)
325 multiplier = 1024 * 1024 * 1024;
326
327 /* convert and set WalSegSz */
328 WalSegSz = xlog_val * multiplier;
329
331 {
332 pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
333 "remote server reported invalid WAL segment size (%d bytes)",
334 WalSegSz),
335 WalSegSz);
336 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
337 return false;
338 }
339
340 return true;
341}
#define ngettext(s, p, n)
Definition c.h:1270
#define pg_log_error_detail(...)
Definition logging.h:111
#define DEFAULT_XLOG_SEG_SIZE
int WalSegSz
Definition streamutil.c:32
#define MINIMUM_VERSION_FOR_SHOW_CMD
Definition streamutil.c:37
#define IsValidWalSegSize(size)

References Assert, conn, DEFAULT_XLOG_SEG_SIZE, fb(), IsValidWalSegSize, MINIMUM_VERSION_FOR_SHOW_CMD, ngettext, pg_log_error, pg_log_error_detail, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQexec(), PQgetvalue, PQnfields, PQntuples, PQresultStatus, PQserverVersion(), and WalSegSz.

Referenced by main().

◆ RunIdentifySystem()

bool RunIdentifySystem ( PGconn conn,
char **  sysid,
TimeLineID starttli,
XLogRecPtr startpos,
char **  db_name 
)

Definition at line 409 of file streamutil.c.

411{
412 PGresult *res;
413 uint32 hi,
414 lo;
415
416 /* Check connection existence */
417 Assert(conn != NULL);
418
419 res = PQexec(conn, "IDENTIFY_SYSTEM");
421 {
422 pg_log_error("could not send replication command \"%s\": %s",
423 "IDENTIFY_SYSTEM", PQerrorMessage(conn));
424
425 PQclear(res);
426 return false;
427 }
428 if (PQntuples(res) != 1 || PQnfields(res) < 3)
429 {
430 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
431 PQntuples(res), PQnfields(res), 1, 3);
432
433 PQclear(res);
434 return false;
435 }
436
437 /* Get system identifier */
438 if (sysid != NULL)
439 *sysid = pg_strdup(PQgetvalue(res, 0, 0));
440
441 /* Get timeline ID to start streaming from */
442 if (starttli != NULL)
443 *starttli = atoi(PQgetvalue(res, 0, 1));
444
445 /* Get LSN start position if necessary */
446 if (startpos != NULL)
447 {
448 if (sscanf(PQgetvalue(res, 0, 2), "%X/%08X", &hi, &lo) != 2)
449 {
450 pg_log_error("could not parse write-ahead log location \"%s\"",
451 PQgetvalue(res, 0, 2));
452
453 PQclear(res);
454 return false;
455 }
456 *startpos = ((uint64) hi) << 32 | lo;
457 }
458
459 /* Get database name, only available in 9.4 and newer versions */
460 if (db_name != NULL)
461 {
462 *db_name = NULL;
463 if (PQserverVersion(conn) >= 90400)
464 {
465 if (PQnfields(res) < 4)
466 {
467 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
468 PQntuples(res), PQnfields(res), 1, 4);
469
470 PQclear(res);
471 return false;
472 }
473 if (!PQgetisnull(res, 0, 3))
474 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
475 }
476 }
477
478 PQclear(res);
479 return true;
480}
char * pg_strdup(const char *in)
Definition fe_memutils.c:91
static XLogRecPtr startpos

References Assert, conn, fb(), pg_log_error, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQexec(), PQgetisnull, PQgetvalue, PQnfields, PQntuples, PQresultStatus, PQserverVersion(), and startpos.

Referenced by BaseBackup(), main(), ReceiveXlogStream(), and StreamLog().

Variable Documentation

◆ conn

PGconn* conn = NULL

Definition at line 52 of file streamutil.c.

Referenced by append_db_pattern_cte(), append_rel_pattern_filtered_cte(), append_rel_pattern_raw_cte(), appendQualifiedRelation(), appendStringLiteralConn(), applyRemoteGucs(), BaseBackup(), build_client_final_message(), build_client_first_message(), build_startup_packet(), buildShSecLabels(), canChangeResultMode(), check_and_drop_existing_subscriptions(), check_and_drop_publications(), check_expected_areq(), check_for_pg_role_prefix(), check_for_prepared_transactions(), check_for_unsupported_encodings(), check_is_install_user(), check_loadable_libraries(), check_new_cluster_replication_slots(), check_new_cluster_subscription_configuration(), check_old_cluster_subscription_state(), check_prepare_conn(), check_publisher(), check_subscriber(), CheckCopyStreamStop(), CheckServerVersionForStreaming(), cleanup_oauth_flow(), cleanup_objects_atexit(), clear_allowed_sasl_mechs(), client_initial_response(), close_cursor(), cluster_all_databases(), cluster_one_database(), compile_database_list(), compile_relation_list_one_db(), configure_remote_session(), confirm_result_status_impl(), connect_database(), connect_pg_server(), ConnectDatabase(), connectDatabase(), connectFailureMessage(), connection_failed(), connectMaintenanceDatabase(), connectNoDelay(), connectOptions1(), connectToServer(), consume_null_result_impl(), consume_query_cancel_impl(), consume_result_status_impl(), copy_connection(), copy_sequences(), CopyStreamPoll(), CopyStreamReceive(), create_cursor(), create_logical_replication_slot(), create_logical_replication_slots(), create_publication(), create_subscription(), CreateReplicationSlot(), dblink_cancel_query(), dblink_close(), dblink_connect(), dblink_disconnect(), dblink_error_message(), dblink_exec(), dblink_fetch(), dblink_get_conn(), dblink_get_notify(), dblink_is_busy(), dblink_open(), dblink_record_internal(), dblink_res_error(), dblink_res_internalerror(), dblink_security_check(), dblink_send_query(), disconnect_atexit(), disconnect_atexit(), disconnect_atexit(), disconnect_atexit(), disconnect_database(), disconnectDatabase(), do_async(), do_sql_command(), do_sql_command_begin(), do_sql_command_end(), doConnect(), drop_existing_subscription(), drop_failover_replication_slots(), drop_primary_replication_slot(), drop_publication(), drop_replication_slot(), dropDBs(), DropReplicationSlot(), dropRoles(), dropTablespaces(), dumpDatabase(), dumpDatabaseConfig(), dumpDatabases(), dumpLOs(), dumpRoleGUCPrivs(), dumpRoleMembership(), dumpRoles(), dumpTableData_copy(), dumpTablespaces(), dumpUserConfig(), ecpg_raise_backend(), emitHostIdentityInfo(), emitShSecLabels(), enable_subscription(), escape_append_literal(), escape_fmt_id(), escape_identifier(), escape_literal(), escape_string_conn(), estimate_path_cost_size(), executeCommand(), executeCommand(), executeMaintenanceCommand(), executeQuery(), executeQuery(), executeQueryOrDie(), ExecuteSqlCommand(), exit_nicely(), exit_nicely(), exit_nicely(), exit_nicely(), exit_nicely(), exit_nicely(), expand_dbname_patterns(), exportFile(), exportFile(), fetch_attstats(), fetch_more_data(), fetch_relstats(), fetch_remote_statistics(), fill_allowed_sasl_mechs(), fillPGconn(), find_publication(), flushAndSendFeedback(), freePGconn(), gen_reindex_command(), generate_object_name(), get_db_conn(), get_db_infos(), get_parallel_tabidx_list(), get_parallel_tables_list(), get_primary_sysid(), get_publisher_databases(), get_remote_estimate(), get_subscription_info(), get_tablespace_paths(), get_template0_info(), getAnotherTuple(), getBackendKeyData(), getCopyDataMessage(), getCopyResult(), getCopyStart(), getHostaddr(), getNotify(), getParamDescriptions(), getParameterStatus(), getReadyForQuery(), getRowDescriptions(), GetSlotInformation(), gss_read(), handle_oauth_sasl_error(), handleCopyIn(), handleCopyOut(), HandleCopyStream(), HandleEndOfCopyStream(), handleFatalError(), handleSyncLoss(), importFile(), importFile(), index_of_allowed_sasl_mech(), init_allowed_encryption_methods(), init_libpq_conn(), init_libpq_source(), initialize_SSL(), internal_ping(), issuer_from_well_known_uri(), libpq_append_conn_error(), libpq_append_grease_info(), libpq_fetch_file(), libpq_get_current_wal_insert_lsn(), libpq_prng_init(), libpq_traverse_files(), libpqrcv_alter_slot(), libpqrcv_connect(), libpqrcv_exec(), libpqrcv_get_backend_pid(), libpqrcv_get_conninfo(), libpqrcv_get_senderinfo(), libpqrcv_identify_system(), libpqrcv_server_version(), libpqsrv_cancel(), libpqsrv_connect(), libpqsrv_connect_complete(), libpqsrv_connect_params(), libpqsrv_disconnect(), libpqsrv_exec(), libpqsrv_exec_params(), libpqsrv_get_result(), libpqsrv_get_result_last(), libpqsrv_PQgetResult(), lo_close(), lo_creat(), lo_create(), lo_export(), lo_import(), lo_import_internal(), lo_import_with_oid(), lo_initialize(), lo_lseek(), lo_lseek64(), lo_open(), lo_read(), lo_tell(), lo_tell64(), lo_truncate(), lo_truncate64(), lo_unlink(), lo_write(), main(), main(), makeAlterConfigCommand(), materializeQueryResult(), materializeResult(), my_truncate(), oauth_exchange(), oauth_init(), old_9_6_invalidate_hash_indexes(), open_client_SSL(), openssl_verify_peer_name_matches_certificate_ip(), openssl_verify_peer_name_matches_certificate_name(), overwrite(), overwrite(), ParallelSlotsAdoptConn(), ParallelSlotsTerminate(), parseInput(), pg_fe_run_oauth_flow(), pg_fe_run_oauth_flow_impl(), pg_fe_sendauth(), pg_GSS_error(), pg_GSS_load_servicename(), pg_GSS_read(), pg_GSS_write(), pg_password_sendauth(), pg_SASL_continue(), pg_SASL_init(), pg_start_oauthbearer(), pgconn_bio_ctrl(), pgconn_bio_read(), pgfdw_cancel_query(), pgfdw_cancel_query_begin(), pgfdw_cancel_query_end(), pgfdw_conn_check(), pgfdw_exec_cleanup_query(), pgfdw_exec_cleanup_query_begin(), pgfdw_exec_cleanup_query_end(), pgfdw_exec_query(), pgfdw_get_cleanup_result(), pgfdw_get_result(), pgfdw_report(), pgfdw_report_error(), pgfdw_report_internal(), pgfdw_security_check(), pgpassfileWarning(), pgtls_close(), pgtls_get_peer_certificate_hash(), pgtls_open_client(), pgtls_read(), pgtls_read_pending(), pgtls_verify_peer_name_matches_certificate_guts(), pgtls_write(), pickout(), pickout(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresExecForeignTruncate(), postgresGetAnalyzeInfoForForeignTable(), postgresImportForeignSchema(), pq_verify_peer_name_matches_certificate(), pq_verify_peer_name_matches_certificate_ip(), pq_verify_peer_name_matches_certificate_name(), pqAllocCmdQueueEntry(), pqAppendCmdQueueEntry(), PQbackendPID(), pqBuildStartupPacket3(), PQcancelCreate(), PQcancelPoll(), PQchangePassword(), pqCheckInBufferSpace(), pqCheckOutBufferSpace(), pqClearAsyncResult(), pqClearOAuthToken(), PQclientEncoding(), pqClosePGconn(), PQclosePortal(), PQclosePrepared(), pqCommandQueueAdvance(), PQconnectdb(), pqConnectDBComplete(), PQconnectdbParams(), pqConnectDBStart(), PQconnectionNeedsPassword(), PQconnectionUsedGSSAPI(), PQconnectionUsedPassword(), pqConnectOptions2(), PQconnectPoll(), PQconnectStart(), PQconnectStartParams(), PQconninfo(), PQconsumeInput(), PQdb(), PQdefaultSSLKeyPassHook_OpenSSL(), PQdescribePortal(), PQdescribePrepared(), pqDropConnection(), pqDropServerData(), PQencryptPasswordConn(), PQendcopy(), pqEndcopy3(), PQenterPipelineMode(), PQerrorMessage(), PQescapeByteaConn(), PQescapeByteaInternal(), PQescapeIdentifier(), PQescapeInternal(), PQescapeLiteral(), PQescapeStringConn(), PQescapeStringInternal(), PQexec(), PQexecFinish(), PQexecParams(), PQexecPrepared(), PQexecStart(), PQexitPipelineMode(), PQfinish(), PQfireResultCreateEvents(), PQflush(), pqFlush(), PQfn(), PQfullProtocolVersion(), pqFunctionCall3(), pqGetc(), PQgetCancel(), PQgetCopyData(), pqGetCopyData3(), pqGetErrorNotice3(), PQgetgssctx(), pqGetInt(), PQgetline(), pqGetline3(), PQgetlineAsync(), pqGetlineAsync3(), pqGetnchar(), pqGetNegotiateProtocolVersion3(), PQgetResult(), pqGets(), pqGets_append(), pqGets_internal(), PQgetssl(), PQgssEncInUse(), PQhost(), PQhostaddr(), PQinstanceData(), PQisBusy(), PQisnonblocking(), pqMakeEmptyPGconn(), PQmakeEmptyPGresult(), PQnfn(), PQnotifies(), PQoptions(), pqPacketSend(), PQparameterStatus(), pqParseDone(), pqParseInput3(), pqParseIntParam(), pqParseProtocolVersion(), PQpass(), PQping(), PQpingParams(), pqPipelineFlush(), pqPipelineProcessQueue(), PQpipelineStatus(), PQpipelineSync(), pqPipelineSyncInternal(), PQport(), PQprepare(), pqPrepareAsyncResult(), PQprotocolVersion(), pqPutc(), PQputCopyData(), PQputCopyEnd(), pqPutInt(), PQputline(), pqPutMsgBytes(), pqPutMsgEnd(), pqPutMsgStart(), PQputnbytes(), pqPutnchar(), pqPuts(), pqReadData(), pqReadReady(), pqRecycleCmdQueueEntry(), PQregisterEventProc(), pqReleaseConnHosts(), PQrequestCancel(), PQreset(), PQresetPoll(), PQresetStart(), pqRowProcessor(), pqSaveErrorResult(), pqSaveParameterStatus(), pqSaveWriteError(), pqsecure_close(), pqsecure_open_client(), pqsecure_open_gss(), pqsecure_raw_read(), pqsecure_raw_write(), pqsecure_read(), pqsecure_write(), PQsendClosePortal(), PQsendClosePrepared(), PQsendDescribePortal(), PQsendDescribePrepared(), PQsendFlushRequest(), PQsendPipelineSync(), PQsendPrepare(), PQsendQuery(), PQsendQueryContinue(), PQsendQueryGuts(), PQsendQueryInternal(), PQsendQueryParams(), PQsendQueryPrepared(), PQsendQueryStart(), pqSendSome(), PQsendTypedCommand(), PQserverVersion(), PQsetChunkedRowsMode(), PQsetClientEncoding(), PQsetdbLogin(), PQsetErrorContextVisibility(), PQsetErrorVerbosity(), PQsetInstanceData(), PQsetnonblocking(), PQsetNoticeProcessor(), PQsetNoticeReceiver(), PQsetSingleRowMode(), PQsetTraceFlags(), pqSkipnchar(), PQsocket(), pqSocketCheck(), PQssl_passwd_cb(), PQsslAttribute(), PQsslAttributeNames(), PQsslInUse(), PQsslStruct(), PQstatus(), PQtrace(), pqTraceOutputCharResponse(), pqTraceOutputMessage(), pqTraceOutputNoTypeByteMessage(), PQtransactionStatus(), PQtty(), PQuntrace(), PQuser(), pqWait(), pqWaitTimed(), pqWriteReady(), prepare_vacuum_command(), prepareToTerminate(), process_result(), ProcessKeepaliveMsg(), processSQLNamePattern(), ProcessWALDataMsg(), prohibit_crossdb_refs(), prompt_user(), read_server_final_message(), read_server_first_message(), ReceiveArchiveStream(), ReceiveBackupManifest(), ReceiveBackupManifestInMemory(), ReceiveCopyData(), ReceiveTarFile(), ReceiveXlogStream(), reindex_all_databases(), reindex_one_database(), release_conn_addrinfo(), report_flow_error(), retrieve_objects(), RetrieveDataDirCreatePerm(), RetrieveWalSegSize(), run_oauth_flow(), run_permutation(), run_reindex_command(), run_simple_command(), run_simple_query(), run_vacuum_command(), RunIdentifySystem(), scram_exchange(), scram_init(), select_next_encryption_method(), send_cancellable_query_impl(), sendFeedback(), sendFeedback(), sendTerminateConn(), server_is_in_recovery(), set_archive_cancel_info(), set_frozenxids(), set_replication_progress(), SetCancelConn(), setKeepalivesCount(), setKeepalivesIdle(), setKeepalivesInterval(), setTCPUserTimeout(), setup_connection(), setup_oauth_parameters(), setup_publisher(), setup_recovery(), setup_subscriber(), setup_token_request(), sql_conn(), sql_exec(), sql_exec_dumpalldbs(), sql_exec_dumpalltables(), sql_exec_dumpalltbspc(), sql_exec_searchtables(), ssl_set_pgconn_bio(), start_device_authz(), start_postmaster(), start_token_request(), StartLogStreamer(), store_conn_addrinfo(), storeQueryResult(), StreamLog(), StreamLogicalLog(), TableCommandResultHandler(), test_cancel(), test_disallowed_in_pipeline(), test_multi_pipelines(), test_nosync(), test_pipeline_abort(), test_pipeline_idle(), test_pipelined_insert(), test_prepared(), test_protocol_version(), test_simple_pipeline(), test_singlerowmode(), test_transaction(), test_uniqviol(), try_complete_step(), useKeepalives(), vacuum_all_databases(), vacuum_one_database(), vacuumlo(), verify_btree_slot_handler(), verify_heap_slot_handler(), wait_for_end_recovery(), wait_on_slots(), and wait_until_connected().

◆ connection_string

char* connection_string = NULL

Definition at line 45 of file streamutil.c.

Referenced by BaseBackup(), ConnectDatabase(), GetConnection(), and main().

◆ dbgetpassword

int dbgetpassword = 0

Definition at line 50 of file streamutil.c.

Referenced by GetConnection(), and main().

◆ dbhost

char* dbhost = NULL

Definition at line 46 of file streamutil.c.

Referenced by GetConnection(), and main().

◆ dbname

◆ dbport

char* dbport = NULL

Definition at line 48 of file streamutil.c.

Referenced by GetConnection(), and main().

◆ dbuser

char* dbuser = NULL

Definition at line 47 of file streamutil.c.

Referenced by GetConnection(), and main().

◆ password

◆ progname

const char* progname

Definition at line 44 of file streamutil.c.

Referenced by GetConnection().

◆ WalSegSz