21 #include "catalog/pg_type_d.h"
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
73 pg_fatal_impl(
int line, const
char *
fmt,...)
94 fprintf(stderr,
"test error cases... ");
97 pg_fatal(
"Expected blocking connection mode");
100 pg_fatal(
"Unable to enter pipeline mode");
103 pg_fatal(
"Pipeline mode not activated properly");
108 pg_fatal(
"PQexec should fail in pipeline mode but succeeded");
110 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111 pg_fatal(
"did not get expected error message; got: \"%s\"",
116 pg_fatal(
"PQsendQuery should fail in pipeline mode but succeeded");
118 "PQsendQuery not allowed in pipeline mode\n") != 0)
119 pg_fatal(
"did not get expected error message; got: \"%s\"",
124 pg_fatal(
"re-entering pipeline mode should be a no-op but failed");
127 pg_fatal(
"PQisBusy should return 0 when idle in pipeline mode, returned 1");
131 pg_fatal(
"couldn't exit idle empty pipeline mode");
134 pg_fatal(
"Pipeline mode not terminated properly");
138 pg_fatal(
"pipeline mode exit when not in pipeline mode should succeed but failed");
143 pg_fatal(
"PQexec should succeed after exiting pipeline mode but failed with: %s",
153 const char *dummy_params[1] = {
"1"};
154 Oid dummy_param_oids[1] = {INT4OID};
156 fprintf(stderr,
"multi pipeline... ");
166 dummy_params, NULL, NULL, 0) != 1)
173 dummy_params, NULL, NULL, 0) != 1)
182 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
186 pg_fatal(
"Unexpected result code %s from first pipeline item",
192 pg_fatal(
"PQgetResult returned something extra after first result");
195 pg_fatal(
"exiting pipeline mode after query but before sync succeeded incorrectly");
199 pg_fatal(
"PQgetResult returned null when sync result expected: %s",
203 pg_fatal(
"Unexpected result code %s instead of sync result, error: %s",
211 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
215 pg_fatal(
"Unexpected result code %s from second pipeline item",
220 pg_fatal(
"Expected null result, got %s",
225 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
229 pg_fatal(
"Unexpected result code %s from second pipeline sync",
234 pg_fatal(
"Fell out of pipeline mode somehow");
238 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
242 pg_fatal(
"exiting pipeline mode didn't seem to work");
264 pg_fatal(
"could not enter pipeline mode");
265 for (
int i = 0;
i < numqueries;
i++)
271 0, NULL, NULL, NULL, NULL, 0) != 1)
278 FD_ZERO(&input_mask);
279 FD_SET(sock, &input_mask);
282 if (
select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
293 pg_fatal(
"failed to send flush request");
305 pg_fatal(
"got unexpected NULL result after %d results", results);
321 if (results == numqueries)
347 const char *dummy_params[1] = {
"1"};
348 Oid dummy_param_oids[1] = {INT4OID};
353 fprintf(stderr,
"aborted pipeline... ");
371 dummy_params[0] =
"1";
373 dummy_params, NULL, NULL, 0) != 1)
377 1, dummy_param_oids, dummy_params,
381 dummy_params[0] =
"2";
383 dummy_params, NULL, NULL, 0) != 1)
389 dummy_params[0] =
"3";
391 dummy_params, NULL, NULL, 0) != 1)
392 pg_fatal(
"dispatching second-pipeline insert failed: %s",
409 pg_fatal(
"Unexpected result status %s: %s",
416 pg_fatal(
"Expected null result, got %s",
424 pg_fatal(
"Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
430 pg_fatal(
"Expected null result, got %s",
441 pg_fatal(
"pipeline should be flagged as aborted but isn't");
448 pg_fatal(
"Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
457 pg_fatal(
"pipeline should be flagged as aborted but isn't");
461 pg_fatal(
"Fell out of pipeline mode somehow");
473 pg_fatal(
"Unexpected result code from first pipeline sync\n"
474 "Expected PGRES_PIPELINE_SYNC, got %s",
479 pg_fatal(
"sync should've cleared the aborted flag but didn't");
483 pg_fatal(
"Fell out of pipeline mode somehow");
490 pg_fatal(
"Unexpected result code %s from first item in second pipeline",
502 pg_fatal(
"Unexpected result code %s from second pipeline sync",
507 pg_fatal(
"Expected null result, got %s: %s",
523 pg_fatal(
"expected error about multiple commands, got %s",
534 pg_fatal(
"did not get cannot-insert-multiple-commands error");
539 pg_fatal(
"Unexpected result code %s from pipeline sync",
545 0, NULL, NULL, NULL, NULL, 0) != 1)
562 pg_fatal(
"expected division-by-zero, got: %s (%s)",
565 printf(
"got expected division-by-zero\n");
574 pg_fatal(
"did not get division-by-zero error");
581 pg_fatal(
"Unexpected result code %s from third pipeline sync",
587 pg_fatal(
"Fell out of pipeline mode somehow");
591 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
595 pg_fatal(
"exiting pipeline mode didn't seem to work");
615 pg_fatal(
"Expected tuples, got %s: %s",
623 if (strcmp(
val,
"3") != 0)
624 pg_fatal(
"expected only insert with value 3, got %s",
val);
648 Oid insert_param_oids[2] = {INT4OID, INT8OID};
649 const char *insert_params[2];
657 insert_params[0] = insert_param_0;
658 insert_params[1] = insert_param_1;
660 rows_to_send = rows_to_receive = n_rows;
675 sql =
"BEGIN TRANSACTION";
696 0, NULL, NULL, NULL, NULL, 0) != 1)
728 FD_ZERO(&input_mask);
729 FD_SET(sock, &input_mask);
730 FD_ZERO(&output_mask);
731 FD_SET(sock, &output_mask);
733 if (
select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
743 if (FD_ISSET(sock, &input_mask))
751 const char *cmdtag =
"";
771 cmdtag =
"DROP TABLE";
775 cmdtag =
"CREATE TABLE";
786 if (rows_to_receive == 0)
805 pg_fatal(
"%s reported status %s, expected %s\n"
806 "Error message: \"%s\"",
811 pg_fatal(
"%s expected command tag '%s', got '%s'",
821 if (FD_ISSET(sock, &output_mask))
832 2, insert_params, NULL, NULL, 0) == 1)
834 pg_debug(
"sent row %d\n", rows_to_send);
837 if (rows_to_send == 0)
846 fprintf(stderr,
"WARNING: failed to send insert #%d: %s\n",
853 0, NULL, NULL, NULL, NULL, 0) == 1)
860 fprintf(stderr,
"WARNING: failed to send commit: %s\n",
873 fprintf(stderr,
"WARNING: pipeline sync failed: %s\n",
882 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
895 Oid param_oids[1] = {INT4OID};
896 Oid expected_oids[4];
899 fprintf(stderr,
"prepared... ");
907 expected_oids[0] = INT4OID;
908 expected_oids[1] = TEXTOID;
909 expected_oids[2] = NUMERICOID;
910 expected_oids[3] = INTERVALOID;
918 pg_fatal(
"PQgetResult returned null");
928 pg_fatal(
"PQgetResult returned NULL");
932 pg_fatal(
"expected %zd columns, got %d",
937 if (typ != expected_oids[
i])
938 pg_fatal(
"field %d: expected type %u, got %u",
939 i, expected_oids[
i], typ);
950 fprintf(stderr,
"closing statement..");
958 pg_fatal(
"expected non-NULL result");
985 fprintf(stderr,
"creating portal... ");
987 PQexec(
conn,
"DECLARE cursor_one CURSOR FOR SELECT 1");
995 pg_fatal(
"PQgetResult returned null");
1001 pg_fatal(
"portal: expected type %u, got %u",
1011 fprintf(stderr,
"closing portal... ");
1019 pg_fatal(
"expected non-NULL result");
1053 int *n_notices = (
int *)
arg;
1056 fprintf(stderr,
"NOTICE %d: %s", *n_notices, message);
1066 fprintf(stderr,
"\npipeline idle...\n");
1078 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
1081 pg_fatal(
"unexpected result code %s from first pipeline item",
1086 pg_fatal(
"did not receive terminating NULL");
1090 pg_fatal(
"exiting pipeline succeeded when it shouldn't");
1092 strlen(
"cannot exit pipeline mode")) != 0)
1093 pg_fatal(
"did not get expected error; got: %s",
1098 pg_fatal(
"unexpected result code %s from second pipeline item",
1103 pg_fatal(
"did not receive terminating NULL");
1108 pg_fatal(
"got %d notice(s)", n_notices);
1114 if (
PQsendQueryParams(
conn,
"SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1119 pg_fatal(
"unexpected NULL result received");
1131 const char *dummy_params[1] = {
"1"};
1132 Oid dummy_param_oids[1] = {INT4OID};
1134 fprintf(stderr,
"simple pipeline... ");
1145 pg_fatal(
"Expected blocking connection mode");
1151 1, dummy_param_oids, dummy_params,
1152 NULL, NULL, 0) != 1)
1156 pg_fatal(
"exiting pipeline mode with work in progress should fail, but succeeded");
1163 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
1167 pg_fatal(
"Unexpected result code %s from first pipeline item",
1174 pg_fatal(
"PQgetResult returned something extra after first query result.");
1181 pg_fatal(
"exiting pipeline mode after query but before sync succeeded incorrectly");
1185 pg_fatal(
"PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1189 pg_fatal(
"Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1196 pg_fatal(
"PQgetResult returned something extra after pipeline end: %s",
1201 pg_fatal(
"Fell out of pipeline mode somehow");
1205 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
1209 pg_fatal(
"Exiting pipeline mode didn't seem to work");
1219 bool pipeline_ended =
false;
1222 pg_fatal(
"failed to enter pipeline mode: %s",
1226 for (
i = 0;
i < 3;
i++)
1233 "SELECT generate_series(42, $1)",
1236 (
const char **) param,
1240 pg_fatal(
"failed to send query: %s",
1247 for (
i = 0; !pipeline_ended;
i++)
1250 bool saw_ending_tuplesok;
1251 bool isSingleTuple =
false;
1257 pg_fatal(
"PQsetSingleRowMode() failed for i=%d",
i);
1261 saw_ending_tuplesok =
false;
1268 fprintf(stderr,
"end of pipeline reached\n");
1269 pipeline_ended =
true;
1272 pg_fatal(
"Expected three results, got %d",
i);
1280 pg_fatal(
"Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1283 pg_fatal(
"Expected PGRES_TUPLES_OK for query %d, got %s",
1293 saw_ending_tuplesok =
true;
1297 fprintf(stderr,
"all tuples received in query %d\n",
i);
1299 pg_fatal(
"Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1304 isSingleTuple =
true;
1313 if (!pipeline_ended && !saw_ending_tuplesok)
1314 pg_fatal(
"didn't get expected terminating TUPLES_OK");
1323 0, NULL, NULL, NULL, NULL, 0) != 1)
1324 pg_fatal(
"failed to send query: %s",
1327 pg_fatal(
"failed to send flush request");
1329 pg_fatal(
"PQsetSingleRowMode() failed");
1334 pg_fatal(
"Expected PGRES_SINGLE_TUPLE, got %s",
1340 pg_fatal(
"Expected PGRES_TUPLES_OK, got %s",
1346 0, NULL, NULL, NULL, NULL, 0) != 1)
1347 pg_fatal(
"failed to send query: %s",
1350 pg_fatal(
"failed to send flush request");
1355 pg_fatal(
"Expected PGRES_TUPLES_OK, got %s",
1378 "CREATE TABLE pq_pipeline_tst (id int)");
1380 pg_fatal(
"failed to create test table: %s",
1385 pg_fatal(
"failed to enter pipeline mode: %s",
1388 pg_fatal(
"could not send prepare on pipeline: %s",
1393 0, NULL, NULL, NULL, NULL, 0) != 1)
1394 pg_fatal(
"failed to send query: %s",
1398 0, NULL, NULL, NULL, NULL, 0) != 1)
1399 pg_fatal(
"failed to send query: %s",
1407 pg_fatal(
"failed to execute prepared: %s",
1412 "INSERT INTO pq_pipeline_tst VALUES (1)",
1413 0, NULL, NULL, NULL, NULL, 0) != 1)
1414 pg_fatal(
"failed to send query: %s",
1425 "INSERT INTO pq_pipeline_tst VALUES (2)",
1426 0, NULL, NULL, NULL, NULL, 0) != 1)
1427 pg_fatal(
"failed to send query: %s",
1438 pg_fatal(
"failed to execute prepared: %s",
1446 "INSERT INTO pq_pipeline_tst VALUES (3)",
1447 0, NULL, NULL, NULL, NULL, 0) != 1)
1448 pg_fatal(
"failed to send query: %s",
1458 expect_null =
false;
1459 for (
int i = 0;;
i++)
1466 printf(
"%d: got NULL result\n",
i);
1468 pg_fatal(
"did not expect NULL here");
1469 expect_null =
false;
1480 printf(
": command didn't run because pipeline aborted\n");
1494 pg_fatal(
"returned something extra after all the syncs: %s",
1507 pg_fatal(
"did not get expected tuple");
1523 Oid paramTypes[2] = {INT8OID, INT8OID};
1524 const char *paramValues[2];
1530 bool read_done =
false;
1531 bool write_done =
false;
1532 bool error_sent =
false;
1533 bool got_error =
false;
1539 fprintf(stderr,
"uniqviol ...");
1543 paramValues[0] = paramValue0;
1544 paramValues[1] = paramValue1;
1548 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1557 "insert into ppln_uniqviol values ($1, $2) returning id",
1563 pg_fatal(
"failed to enter pipeline mode");
1578 if (results >= numsent)
1587 if (new_error && got_error)
1589 got_error |= new_error;
1590 if (results++ >= numsent - 1)
1602 FD_SET(sock, &out_fds);
1605 FD_SET(sock, &in_fds);
1607 if (
select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1621 if (!write_done && FD_ISSET(sock, &out_fds))
1631 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1633 sprintf(paramValue0,
"%d", numsent / 2);
1640 sprintf(paramValue0,
"%d", ctr++);
1648 if (socketful != 0 && numsent % socketful == 42 && error_sent)
1651 pg_fatal(
"failed to send flush request");
1653 fprintf(stderr,
"\ndone writing\n");
1665 socketful = numsent;
1666 fprintf(stderr,
"\nswitch to reading\n");
1675 pg_fatal(
"did not get expected error");
1690 bool got_error =
false;
1719 fprintf(stderr,
"result %d/%d: pipeline aborted\n", results, numsent);
1741 fprintf(stderr,
"\nOptions:\n");
1742 fprintf(stderr,
" -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1743 fprintf(stderr,
" -r NUMROWS use NUMROWS as the test size\n");
1749 printf(
"disallowed_in_pipeline\n");
1750 printf(
"multi_pipelines\n");
1752 printf(
"pipeline_abort\n");
1753 printf(
"pipeline_idle\n");
1754 printf(
"pipelined_insert\n");
1756 printf(
"simple_pipeline\n");
1765 const char *conninfo =
"";
1769 int numrows = 10000;
1773 while ((
c =
getopt(argc, argv,
"r:t:")) != -1)
1779 numrows = strtol(
optarg, NULL, 10);
1780 if (errno != 0 || numrows <= 0)
1782 fprintf(stderr,
"couldn't parse \"%s\" as a positive integer\n",
1804 if (strcmp(testname,
"tests") == 0)
1820 fprintf(stderr,
"Connection to database failed: %s\n",
1850 if (strcmp(testname,
"disallowed_in_pipeline") == 0)
1852 else if (strcmp(testname,
"multi_pipelines") == 0)
1854 else if (strcmp(testname,
"nosync") == 0)
1856 else if (strcmp(testname,
"pipeline_abort") == 0)
1858 else if (strcmp(testname,
"pipeline_idle") == 0)
1860 else if (strcmp(testname,
"pipelined_insert") == 0)
1862 else if (strcmp(testname,
"prepared") == 0)
1864 else if (strcmp(testname,
"simple_pipeline") == 0)
1866 else if (strcmp(testname,
"singlerow") == 0)
1868 else if (strcmp(testname,
"transaction") == 0)
1870 else if (strcmp(testname,
"uniqviol") == 0)
1874 fprintf(stderr,
"\"%s\" is not a recognized test name\n", testname);
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
int PQsocket(const PGconn *conn)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsetSingleRowMode(PGconn *conn)
int PQflush(PGconn *conn)
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Oid PQftype(const PGresult *res, int field_num)
int PQexitPipelineMode(PGconn *conn)
int PQsendClosePortal(PGconn *conn, const char *portal)
int PQenterPipelineMode(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
int PQsendClosePrepared(PGconn *conn, const char *stmt)
char * PQresultErrorMessage(const PGresult *res)
PGresult * PQclosePortal(PGconn *conn, const char *portal)
int PQntuples(const PGresult *res)
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
char * PQcmdStatus(PGresult *res)
int PQsetnonblocking(PGconn *conn, int arg)
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
int PQsendQuery(PGconn *conn, const char *query)
int PQpipelineSync(PGconn *conn)
char * PQresStatus(ExecStatusType status)
int PQsendDescribePortal(PGconn *conn, const char *portal)
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
int PQisBusy(PGconn *conn)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsendFlushRequest(PGconn *conn)
char * PQresultErrorField(const PGresult *res, int fieldcode)
int PQisnonblocking(const PGconn *conn)
int PQnfields(const PGresult *res)
PGresult * PQgetResult(PGconn *conn)
void PQtrace(PGconn *conn, FILE *debug_port)
void PQsetTraceFlags(PGconn *conn, int flags)
char * pg_strdup(const char *in)
#define PQTRACE_SUPPRESS_TIMESTAMPS
#define PQTRACE_REGRESS_MODE
static void usage(const char *progname)
static void const char * fmt
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void const char fflush(stdout)
static void exit_nicely(PGconn *conn)
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static const char *const insert_sql
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
int getopt(int nargc, char *const *nargv, const char *ostr)
PGDLLIMPORT char * optarg
char * psprintf(const char *fmt,...)
#define select(n, r, w, e, timeout)