21 #include "catalog/pg_type_d.h"
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
73 pg_fatal_impl(
int line, const
char *
fmt,...)
94 fprintf(stderr,
"test error cases... ");
97 pg_fatal(
"Expected blocking connection mode");
100 pg_fatal(
"Unable to enter pipeline mode");
103 pg_fatal(
"Pipeline mode not activated properly");
108 pg_fatal(
"PQexec should fail in pipeline mode but succeeded");
110 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111 pg_fatal(
"did not get expected error message; got: \"%s\"",
116 pg_fatal(
"PQsendQuery should fail in pipeline mode but succeeded");
118 "PQsendQuery not allowed in pipeline mode\n") != 0)
119 pg_fatal(
"did not get expected error message; got: \"%s\"",
124 pg_fatal(
"re-entering pipeline mode should be a no-op but failed");
127 pg_fatal(
"PQisBusy should return 0 when idle in pipeline mode, returned 1");
131 pg_fatal(
"couldn't exit idle empty pipeline mode");
134 pg_fatal(
"Pipeline mode not terminated properly");
138 pg_fatal(
"pipeline mode exit when not in pipeline mode should succeed but failed");
143 pg_fatal(
"PQexec should succeed after exiting pipeline mode but failed with: %s",
153 const char *dummy_params[1] = {
"1"};
154 Oid dummy_param_oids[1] = {INT4OID};
156 fprintf(stderr,
"multi pipeline... ");
166 dummy_params, NULL, NULL, 0) != 1)
173 dummy_params, NULL, NULL, 0) != 1)
182 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
186 pg_fatal(
"Unexpected result code %s from first pipeline item",
192 pg_fatal(
"PQgetResult returned something extra after first result");
195 pg_fatal(
"exiting pipeline mode after query but before sync succeeded incorrectly");
199 pg_fatal(
"PQgetResult returned null when sync result expected: %s",
203 pg_fatal(
"Unexpected result code %s instead of sync result, error: %s",
211 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
215 pg_fatal(
"Unexpected result code %s from second pipeline item",
220 pg_fatal(
"Expected null result, got %s",
225 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
229 pg_fatal(
"Unexpected result code %s from second pipeline sync",
234 pg_fatal(
"Fell out of pipeline mode somehow");
238 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
242 pg_fatal(
"exiting pipeline mode didn't seem to work");
264 pg_fatal(
"could not enter pipeline mode");
265 for (
int i = 0;
i < numqueries;
i++)
271 0, NULL, NULL, NULL, NULL, 0) != 1)
278 FD_ZERO(&input_mask);
279 FD_SET(sock, &input_mask);
282 if (
select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
293 pg_fatal(
"failed to send flush request");
305 pg_fatal(
"got unexpected NULL result after %d results", results);
321 if (results == numqueries)
347 const char *dummy_params[1] = {
"1"};
348 Oid dummy_param_oids[1] = {INT4OID};
353 fprintf(stderr,
"aborted pipeline... ");
371 dummy_params[0] =
"1";
373 dummy_params, NULL, NULL, 0) != 1)
377 1, dummy_param_oids, dummy_params,
381 dummy_params[0] =
"2";
383 dummy_params, NULL, NULL, 0) != 1)
389 dummy_params[0] =
"3";
391 dummy_params, NULL, NULL, 0) != 1)
392 pg_fatal(
"dispatching second-pipeline insert failed: %s",
409 pg_fatal(
"Unexpected result status %s: %s",
416 pg_fatal(
"Expected null result, got %s",
424 pg_fatal(
"Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
430 pg_fatal(
"Expected null result, got %s",
441 pg_fatal(
"pipeline should be flagged as aborted but isn't");
448 pg_fatal(
"Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
457 pg_fatal(
"pipeline should be flagged as aborted but isn't");
461 pg_fatal(
"Fell out of pipeline mode somehow");
473 pg_fatal(
"Unexpected result code from first pipeline sync\n"
474 "Expected PGRES_PIPELINE_SYNC, got %s",
479 pg_fatal(
"sync should've cleared the aborted flag but didn't");
483 pg_fatal(
"Fell out of pipeline mode somehow");
490 pg_fatal(
"Unexpected result code %s from first item in second pipeline",
502 pg_fatal(
"Unexpected result code %s from second pipeline sync",
507 pg_fatal(
"Expected null result, got %s: %s",
523 pg_fatal(
"expected error about multiple commands, got %s",
534 pg_fatal(
"did not get cannot-insert-multiple-commands error");
539 pg_fatal(
"Unexpected result code %s from pipeline sync",
545 0, NULL, NULL, NULL, NULL, 0) != 1)
562 pg_fatal(
"expected division-by-zero, got: %s (%s)",
565 printf(
"got expected division-by-zero\n");
574 pg_fatal(
"did not get division-by-zero error");
581 pg_fatal(
"Unexpected result code %s from third pipeline sync",
587 pg_fatal(
"Fell out of pipeline mode somehow");
591 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
595 pg_fatal(
"exiting pipeline mode didn't seem to work");
615 pg_fatal(
"Expected tuples, got %s: %s",
623 if (strcmp(
val,
"3") != 0)
624 pg_fatal(
"expected only insert with value 3, got %s",
val);
648 Oid insert_param_oids[2] = {INT4OID, INT8OID};
649 const char *insert_params[2];
657 insert_params[0] = insert_param_0;
658 insert_params[1] = insert_param_1;
660 rows_to_send = rows_to_receive = n_rows;
675 sql =
"BEGIN TRANSACTION";
696 0, NULL, NULL, NULL, NULL, 0) != 1)
728 FD_ZERO(&input_mask);
729 FD_SET(sock, &input_mask);
730 FD_ZERO(&output_mask);
731 FD_SET(sock, &output_mask);
733 if (
select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
743 if (FD_ISSET(sock, &input_mask))
751 const char *cmdtag =
"";
752 const char *description =
"";
771 cmdtag =
"DROP TABLE";
775 cmdtag =
"CREATE TABLE";
780 description =
"PREPARE";
786 if (rows_to_receive == 0)
795 description =
"SYNC";
805 pg_fatal(
"%s reported status %s, expected %s\n"
806 "Error message: \"%s\"",
811 pg_fatal(
"%s expected command tag '%s', got '%s'",
814 pg_debug(
"Got %s OK\n", cmdtag[0] !=
'\0' ? cmdtag : description);
821 if (FD_ISSET(sock, &output_mask))
832 2, insert_params, NULL, NULL, 0) == 1)
834 pg_debug(
"sent row %d\n", rows_to_send);
837 if (rows_to_send == 0)
846 fprintf(stderr,
"WARNING: failed to send insert #%d: %s\n",
853 0, NULL, NULL, NULL, NULL, 0) == 1)
860 fprintf(stderr,
"WARNING: failed to send commit: %s\n",
873 fprintf(stderr,
"WARNING: pipeline sync failed: %s\n",
882 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
895 Oid param_oids[1] = {INT4OID};
896 Oid expected_oids[4];
899 fprintf(stderr,
"prepared... ");
907 expected_oids[0] = INT4OID;
908 expected_oids[1] = TEXTOID;
909 expected_oids[2] = NUMERICOID;
910 expected_oids[3] = INTERVALOID;
918 pg_fatal(
"PQgetResult returned null");
928 pg_fatal(
"PQgetResult returned NULL");
932 pg_fatal(
"expected %zd columns, got %d",
937 if (typ != expected_oids[
i])
938 pg_fatal(
"field %d: expected type %u, got %u",
939 i, expected_oids[
i], typ);
954 PQexec(
conn,
"DECLARE cursor_one CURSOR FOR SELECT 1");
968 pg_fatal(
"portal: expected type %u, got %u",
988 int *n_notices = (
int *)
arg;
991 fprintf(stderr,
"NOTICE %d: %s", *n_notices, message);
1001 fprintf(stderr,
"\npipeline idle...\n");
1013 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
1016 pg_fatal(
"unexpected result code %s from first pipeline item",
1021 pg_fatal(
"did not receive terminating NULL");
1025 pg_fatal(
"exiting pipeline succeeded when it shouldn't");
1027 strlen(
"cannot exit pipeline mode")) != 0)
1028 pg_fatal(
"did not get expected error; got: %s",
1033 pg_fatal(
"unexpected result code %s from second pipeline item",
1038 pg_fatal(
"did not receive terminating NULL");
1043 pg_fatal(
"got %d notice(s)", n_notices);
1049 if (
PQsendQueryParams(
conn,
"SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1054 pg_fatal(
"unexpected NULL result received");
1066 const char *dummy_params[1] = {
"1"};
1067 Oid dummy_param_oids[1] = {INT4OID};
1069 fprintf(stderr,
"simple pipeline... ");
1080 pg_fatal(
"Expected blocking connection mode");
1086 1, dummy_param_oids, dummy_params,
1087 NULL, NULL, 0) != 1)
1091 pg_fatal(
"exiting pipeline mode with work in progress should fail, but succeeded");
1098 pg_fatal(
"PQgetResult returned null when there's a pipeline item: %s",
1102 pg_fatal(
"Unexpected result code %s from first pipeline item",
1109 pg_fatal(
"PQgetResult returned something extra after first query result.");
1116 pg_fatal(
"exiting pipeline mode after query but before sync succeeded incorrectly");
1120 pg_fatal(
"PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1124 pg_fatal(
"Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1131 pg_fatal(
"PQgetResult returned something extra after pipeline end: %s",
1136 pg_fatal(
"Fell out of pipeline mode somehow");
1140 pg_fatal(
"attempt to exit pipeline mode failed when it should've succeeded: %s",
1144 pg_fatal(
"Exiting pipeline mode didn't seem to work");
1154 bool pipeline_ended =
false;
1157 pg_fatal(
"failed to enter pipeline mode: %s",
1161 for (
i = 0;
i < 3;
i++)
1168 "SELECT generate_series(42, $1)",
1171 (
const char **) param,
1175 pg_fatal(
"failed to send query: %s",
1182 for (
i = 0; !pipeline_ended;
i++)
1185 bool saw_ending_tuplesok;
1186 bool isSingleTuple =
false;
1192 pg_fatal(
"PQsetSingleRowMode() failed for i=%d",
i);
1196 saw_ending_tuplesok =
false;
1203 fprintf(stderr,
"end of pipeline reached\n");
1204 pipeline_ended =
true;
1207 pg_fatal(
"Expected three results, got %d",
i);
1215 pg_fatal(
"Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1218 pg_fatal(
"Expected PGRES_TUPLES_OK for query %d, got %s",
1228 saw_ending_tuplesok =
true;
1232 fprintf(stderr,
"all tuples received in query %d\n",
i);
1234 pg_fatal(
"Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1239 isSingleTuple =
true;
1248 if (!pipeline_ended && !saw_ending_tuplesok)
1249 pg_fatal(
"didn't get expected terminating TUPLES_OK");
1258 0, NULL, NULL, NULL, NULL, 0) != 1)
1259 pg_fatal(
"failed to send query: %s",
1262 pg_fatal(
"failed to send flush request");
1264 pg_fatal(
"PQsetSingleRowMode() failed");
1269 pg_fatal(
"Expected PGRES_SINGLE_TUPLE, got %s",
1275 pg_fatal(
"Expected PGRES_TUPLES_OK, got %s",
1281 0, NULL, NULL, NULL, NULL, 0) != 1)
1282 pg_fatal(
"failed to send query: %s",
1285 pg_fatal(
"failed to send flush request");
1290 pg_fatal(
"Expected PGRES_TUPLES_OK, got %s",
1313 "CREATE TABLE pq_pipeline_tst (id int)");
1315 pg_fatal(
"failed to create test table: %s",
1320 pg_fatal(
"failed to enter pipeline mode: %s",
1323 pg_fatal(
"could not send prepare on pipeline: %s",
1328 0, NULL, NULL, NULL, NULL, 0) != 1)
1329 pg_fatal(
"failed to send query: %s",
1333 0, NULL, NULL, NULL, NULL, 0) != 1)
1334 pg_fatal(
"failed to send query: %s",
1342 pg_fatal(
"failed to execute prepared: %s",
1347 "INSERT INTO pq_pipeline_tst VALUES (1)",
1348 0, NULL, NULL, NULL, NULL, 0) != 1)
1349 pg_fatal(
"failed to send query: %s",
1360 "INSERT INTO pq_pipeline_tst VALUES (2)",
1361 0, NULL, NULL, NULL, NULL, 0) != 1)
1362 pg_fatal(
"failed to send query: %s",
1373 pg_fatal(
"failed to execute prepared: %s",
1381 "INSERT INTO pq_pipeline_tst VALUES (3)",
1382 0, NULL, NULL, NULL, NULL, 0) != 1)
1383 pg_fatal(
"failed to send query: %s",
1393 expect_null =
false;
1394 for (
int i = 0;;
i++)
1401 printf(
"%d: got NULL result\n",
i);
1403 pg_fatal(
"did not expect NULL here");
1404 expect_null =
false;
1415 printf(
": command didn't run because pipeline aborted\n");
1429 pg_fatal(
"returned something extra after all the syncs: %s",
1442 pg_fatal(
"did not get expected tuple");
1458 Oid paramTypes[2] = {INT8OID, INT8OID};
1459 const char *paramValues[2];
1465 bool read_done =
false;
1466 bool write_done =
false;
1467 bool error_sent =
false;
1468 bool got_error =
false;
1474 fprintf(stderr,
"uniqviol ...");
1478 paramValues[0] = paramValue0;
1479 paramValues[1] = paramValue1;
1483 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1492 "insert into ppln_uniqviol values ($1, $2) returning id",
1498 pg_fatal(
"failed to enter pipeline mode");
1513 if (results >= numsent)
1522 if (new_error && got_error)
1524 got_error |= new_error;
1525 if (results++ >= numsent - 1)
1537 FD_SET(sock, &out_fds);
1540 FD_SET(sock, &in_fds);
1542 if (
select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1556 if (!write_done && FD_ISSET(sock, &out_fds))
1566 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1568 sprintf(paramValue0,
"%d", numsent / 2);
1575 sprintf(paramValue0,
"%d", ctr++);
1583 if (socketful != 0 && numsent % socketful == 42 && error_sent)
1586 pg_fatal(
"failed to send flush request");
1588 fprintf(stderr,
"\ndone writing\n");
1600 socketful = numsent;
1601 fprintf(stderr,
"\nswitch to reading\n");
1610 pg_fatal(
"did not get expected error");
1625 bool got_error =
false;
1654 fprintf(stderr,
"result %d/%d: pipeline aborted\n", results, numsent);
1676 fprintf(stderr,
"\nOptions:\n");
1677 fprintf(stderr,
" -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1678 fprintf(stderr,
" -r NUMROWS use NUMROWS as the test size\n");
1684 printf(
"disallowed_in_pipeline\n");
1685 printf(
"multi_pipelines\n");
1687 printf(
"pipeline_abort\n");
1688 printf(
"pipeline_idle\n");
1689 printf(
"pipelined_insert\n");
1691 printf(
"simple_pipeline\n");
1700 const char *conninfo =
"";
1704 int numrows = 10000;
1708 while ((
c =
getopt(argc, argv,
"r:t:")) != -1)
1714 numrows = strtol(
optarg, NULL, 10);
1715 if (errno != 0 || numrows <= 0)
1717 fprintf(stderr,
"couldn't parse \"%s\" as a positive integer\n",
1739 if (strcmp(testname,
"tests") == 0)
1755 fprintf(stderr,
"Connection to database failed: %s\n",
1785 if (strcmp(testname,
"disallowed_in_pipeline") == 0)
1787 else if (strcmp(testname,
"multi_pipelines") == 0)
1789 else if (strcmp(testname,
"nosync") == 0)
1791 else if (strcmp(testname,
"pipeline_abort") == 0)
1793 else if (strcmp(testname,
"pipeline_idle") == 0)
1795 else if (strcmp(testname,
"pipelined_insert") == 0)
1797 else if (strcmp(testname,
"prepared") == 0)
1799 else if (strcmp(testname,
"simple_pipeline") == 0)
1801 else if (strcmp(testname,
"singlerow") == 0)
1803 else if (strcmp(testname,
"transaction") == 0)
1805 else if (strcmp(testname,
"uniqviol") == 0)
1809 fprintf(stderr,
"\"%s\" is not a recognized test name\n", testname);
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
int PQsocket(const PGconn *conn)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsetSingleRowMode(PGconn *conn)
int PQflush(PGconn *conn)
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Oid PQftype(const PGresult *res, int field_num)
int PQexitPipelineMode(PGconn *conn)
int PQenterPipelineMode(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQntuples(const PGresult *res)
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
PGresult * PQexec(PGconn *conn, const char *query)
int PQconsumeInput(PGconn *conn)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
char * PQcmdStatus(PGresult *res)
int PQsetnonblocking(PGconn *conn, int arg)
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
int PQsendQuery(PGconn *conn, const char *query)
int PQpipelineSync(PGconn *conn)
char * PQresStatus(ExecStatusType status)
int PQsendDescribePortal(PGconn *conn, const char *portal)
int PQisBusy(PGconn *conn)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsendFlushRequest(PGconn *conn)
char * PQresultErrorField(const PGresult *res, int fieldcode)
int PQisnonblocking(const PGconn *conn)
int PQnfields(const PGresult *res)
PGresult * PQgetResult(PGconn *conn)
void PQtrace(PGconn *conn, FILE *debug_port)
void PQsetTraceFlags(PGconn *conn, int flags)
char * pg_strdup(const char *in)
#define PQTRACE_SUPPRESS_TIMESTAMPS
#define PQTRACE_REGRESS_MODE
static void usage(const char *progname)
static void const char * fmt
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void const char fflush(stdout)
static void exit_nicely(PGconn *conn)
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static const char *const insert_sql
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
int getopt(int nargc, char *const *nargv, const char *ostr)
PGDLLIMPORT char * optarg
char * psprintf(const char *fmt,...)
#define select(n, r, w, e, timeout)