PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
libpq_pipeline.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <sys/select.h>
19#include <sys/time.h>
20
21#include "catalog/pg_type_d.h"
22#include "libpq-fe.h"
23#include "pg_getopt.h"
24
25
26static void exit_nicely(PGconn *conn);
27pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
29static bool process_result(PGconn *conn, PGresult *res, int results,
30 int numsent);
31
32static const char *const progname = "libpq_pipeline";
33
34/* Options and defaults */
35static char *tracefile = NULL; /* path to PQtrace() file */
36
37
38#ifdef DEBUG_OUTPUT
39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40#else
41#define pg_debug(...)
42#endif
43
44static const char *const drop_table_sql =
45"DROP TABLE IF EXISTS pq_pipeline_demo";
46static const char *const create_table_sql =
47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48"int8filler int8);";
49static const char *const insert_sql =
50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51static const char *const insert_sql2 =
52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53
54/* max char length of an int32/64, plus sign and null terminator */
55#define MAXINTLEN 12
56#define MAXINT8LEN 20
57
58static void
60{
62 exit(1);
63}
64
65/*
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
68 */
69
70/*
71 * Print an error to stderr and terminate the program.
72 */
73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74pg_noreturn static void
75pg_fatal_impl(int line, const char *fmt,...)
76{
77 va_list args;
78
79 fflush(stdout);
80
81 fprintf(stderr, "\n%s:%d: ", progname, line);
82 va_start(args, fmt);
83 vfprintf(stderr, fmt, args);
84 va_end(args);
85 Assert(fmt[strlen(fmt) - 1] != '\n');
86 fprintf(stderr, "\n");
87 exit(1);
88}
89
90/*
91 * Check that the query on the given connection got canceled.
92 */
93#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
94static void
96{
97 PGresult *res = NULL;
98
99 res = PQgetResult(conn);
100 if (res == NULL)
101 pg_fatal_impl(line, "PQgetResult returned null: %s",
104 pg_fatal_impl(line, "query did not fail when it was expected");
105 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
106 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
108 PQclear(res);
109
110 while (PQisBusy(conn))
112}
113
114/*
115 * Using monitorConn, query pg_stat_activity to see that the connection with
116 * the given PID is either in the given state, or waiting on the given event
117 * (only one of them can be given).
118 */
119static void
120wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
121 char *state, char *event)
122{
123 const Oid paramTypes[] = {INT4OID, TEXTOID};
124 const char *paramValues[2];
125 char *pidstr = psprintf("%d", procpid);
126
127 Assert((state == NULL) ^ (event == NULL));
128
129 paramValues[0] = pidstr;
130 paramValues[1] = state ? state : event;
131
132 while (true)
133 {
134 PGresult *res;
135 char *value;
136
137 if (state != NULL)
138 res = PQexecParams(monitorConn,
139 "SELECT count(*) FROM pg_stat_activity WHERE "
140 "pid = $1 AND state = $2",
141 2, paramTypes, paramValues, NULL, NULL, 0);
142 else
143 res = PQexecParams(monitorConn,
144 "SELECT count(*) FROM pg_stat_activity WHERE "
145 "pid = $1 AND wait_event = $2",
146 2, paramTypes, paramValues, NULL, NULL, 0);
147
149 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
150 if (PQntuples(res) != 1)
151 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
152 if (PQnfields(res) != 1)
153 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
154 value = PQgetvalue(res, 0, 0);
155 if (strcmp(value, "0") != 0)
156 {
157 PQclear(res);
158 break;
159 }
160 PQclear(res);
161
162 /* wait 10ms before polling again */
163 pg_usleep(10000);
164 }
165
166 pfree(pidstr);
167}
168
169#define send_cancellable_query(conn, monitorConn) \
170 send_cancellable_query_impl(__LINE__, conn, monitorConn)
171static void
173{
174 const char *env_wait;
175 const Oid paramTypes[1] = {INT4OID};
176
177 /*
178 * Wait for the connection to be idle, so that our check for an active
179 * connection below is reliable, instead of possibly seeing an outdated
180 * state.
181 */
182 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
183
184 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
185 if (env_wait == NULL)
186 env_wait = "180";
187
188 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
189 &env_wait, NULL, NULL, 0) != 1)
190 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
191
192 /*
193 * Wait for the sleep to be active, because if the query is not running
194 * yet, the cancel request that we send won't have any effect.
195 */
196 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
197}
198
199/*
200 * Create a new connection with the same conninfo as the given one.
201 */
202static PGconn *
204{
205 PGconn *copyConn;
207 const char **keywords;
208 const char **vals;
209 int nopts = 0;
210 int i;
211
212 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
213 nopts++;
214 nopts++; /* for the NULL terminator */
215
216 keywords = pg_malloc(sizeof(char *) * nopts);
217 vals = pg_malloc(sizeof(char *) * nopts);
218
219 i = 0;
220 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
221 {
222 if (opt->val)
223 {
224 keywords[i] = opt->keyword;
225 vals[i] = opt->val;
226 i++;
227 }
228 }
229 keywords[i] = vals[i] = NULL;
230
231 copyConn = PQconnectdbParams(keywords, vals, false);
232
233 if (PQstatus(copyConn) != CONNECTION_OK)
234 pg_fatal("Connection to database failed: %s",
235 PQerrorMessage(copyConn));
236
237 return copyConn;
238}
239
240/*
241 * Test query cancellation routines
242 */
243static void
245{
246 PGcancel *cancel;
248 PGconn *monitorConn;
249 char errorbuf[256];
250
251 fprintf(stderr, "test cancellations... ");
252
253 if (PQsetnonblocking(conn, 1) != 0)
254 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
255
256 /*
257 * Make a separate connection to the database to monitor the query on the
258 * main connection.
259 */
260 monitorConn = copy_connection(conn);
261 Assert(PQstatus(monitorConn) == CONNECTION_OK);
262
263 /* test PQcancel */
264 send_cancellable_query(conn, monitorConn);
265 cancel = PQgetCancel(conn);
266 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
267 pg_fatal("failed to run PQcancel: %s", errorbuf);
269
270 /* PGcancel object can be reused for the next query */
271 send_cancellable_query(conn, monitorConn);
272 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
273 pg_fatal("failed to run PQcancel: %s", errorbuf);
275
276 PQfreeCancel(cancel);
277
278 /* test PQrequestCancel */
279 send_cancellable_query(conn, monitorConn);
280 if (!PQrequestCancel(conn))
281 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
283
284 /* test PQcancelBlocking */
285 send_cancellable_query(conn, monitorConn);
288 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
291
292 /* test PQcancelCreate and then polling with PQcancelPoll */
293 send_cancellable_query(conn, monitorConn);
296 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
297 while (true)
298 {
299 struct timeval tv;
300 fd_set input_mask;
301 fd_set output_mask;
303 int sock = PQcancelSocket(cancelConn);
304
305 if (pollres == PGRES_POLLING_OK)
306 break;
307
308 FD_ZERO(&input_mask);
309 FD_ZERO(&output_mask);
310 switch (pollres)
311 {
313 pg_debug("polling for reads\n");
314 FD_SET(sock, &input_mask);
315 break;
317 pg_debug("polling for writes\n");
318 FD_SET(sock, &output_mask);
319 break;
320 default:
321 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
322 }
323
324 if (sock < 0)
325 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
326
327 tv.tv_sec = 3;
328 tv.tv_usec = 0;
329
330 while (true)
331 {
332 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
333 {
334 if (errno == EINTR)
335 continue;
336 pg_fatal("select() failed: %m");
337 }
338 break;
339 }
340 }
342 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
344
345 /*
346 * test PQcancelReset works on the cancel connection and it can be reused
347 * afterwards
348 */
350
351 send_cancellable_query(conn, monitorConn);
353 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
354 while (true)
355 {
356 struct timeval tv;
357 fd_set input_mask;
358 fd_set output_mask;
360 int sock = PQcancelSocket(cancelConn);
361
362 if (pollres == PGRES_POLLING_OK)
363 break;
364
365 FD_ZERO(&input_mask);
366 FD_ZERO(&output_mask);
367 switch (pollres)
368 {
370 pg_debug("polling for reads\n");
371 FD_SET(sock, &input_mask);
372 break;
374 pg_debug("polling for writes\n");
375 FD_SET(sock, &output_mask);
376 break;
377 default:
378 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
379 }
380
381 if (sock < 0)
382 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
383
384 tv.tv_sec = 3;
385 tv.tv_usec = 0;
386
387 while (true)
388 {
389 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
390 {
391 if (errno == EINTR)
392 continue;
393 pg_fatal("select() failed: %m");
394 }
395 break;
396 }
397 }
399 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
401
403
404 fprintf(stderr, "ok\n");
405}
406
407static void
409{
410 PGresult *res = NULL;
411
412 fprintf(stderr, "test error cases... ");
413
415 pg_fatal("Expected blocking connection mode");
416
417 if (PQenterPipelineMode(conn) != 1)
418 pg_fatal("Unable to enter pipeline mode");
419
421 pg_fatal("Pipeline mode not activated properly");
422
423 /* PQexec should fail in pipeline mode */
424 res = PQexec(conn, "SELECT 1");
426 pg_fatal("PQexec should fail in pipeline mode but succeeded");
427 if (strcmp(PQerrorMessage(conn),
428 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
429 pg_fatal("did not get expected error message; got: \"%s\"",
431
432 /* PQsendQuery should fail in pipeline mode */
433 if (PQsendQuery(conn, "SELECT 1") != 0)
434 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
435 if (strcmp(PQerrorMessage(conn),
436 "PQsendQuery not allowed in pipeline mode\n") != 0)
437 pg_fatal("did not get expected error message; got: \"%s\"",
439
440 /* Entering pipeline mode when already in pipeline mode is OK */
441 if (PQenterPipelineMode(conn) != 1)
442 pg_fatal("re-entering pipeline mode should be a no-op but failed");
443
444 if (PQisBusy(conn) != 0)
445 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
446
447 /* ok, back to normal command mode */
448 if (PQexitPipelineMode(conn) != 1)
449 pg_fatal("couldn't exit idle empty pipeline mode");
450
452 pg_fatal("Pipeline mode not terminated properly");
453
454 /* exiting pipeline mode when not in pipeline mode should be a no-op */
455 if (PQexitPipelineMode(conn) != 1)
456 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
457
458 /* can now PQexec again */
459 res = PQexec(conn, "SELECT 1");
461 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
463
464 fprintf(stderr, "ok\n");
465}
466
467static void
469{
470 PGresult *res = NULL;
471 const char *dummy_params[1] = {"1"};
472 Oid dummy_param_oids[1] = {INT4OID};
473
474 fprintf(stderr, "multi pipeline... ");
475
476 /*
477 * Queue up a couple of small pipelines and process each without returning
478 * to command mode first.
479 */
480 if (PQenterPipelineMode(conn) != 1)
481 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
482
483 /* first pipeline */
484 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
485 dummy_params, NULL, NULL, 0) != 1)
486 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
487
488 if (PQpipelineSync(conn) != 1)
489 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
490
491 /* second pipeline */
492 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
493 dummy_params, NULL, NULL, 0) != 1)
494 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
495
496 /* Skip flushing once. */
497 if (PQsendPipelineSync(conn) != 1)
498 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
499
500 /* third pipeline */
501 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
502 dummy_params, NULL, NULL, 0) != 1)
503 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
504
505 if (PQpipelineSync(conn) != 1)
506 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
507
508 /* OK, start processing the results */
509
510 /* first pipeline */
511
512 res = PQgetResult(conn);
513 if (res == NULL)
514 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
516
518 pg_fatal("Unexpected result code %s from first pipeline item",
520 PQclear(res);
521 res = NULL;
522
523 if (PQgetResult(conn) != NULL)
524 pg_fatal("PQgetResult returned something extra after first result");
525
526 if (PQexitPipelineMode(conn) != 0)
527 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
528
529 res = PQgetResult(conn);
530 if (res == NULL)
531 pg_fatal("PQgetResult returned null when sync result expected: %s",
533
535 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
537 PQclear(res);
538
539 /* second pipeline */
540
541 res = PQgetResult(conn);
542 if (res == NULL)
543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
545
547 pg_fatal("Unexpected result code %s from second pipeline item",
549 PQclear(res);
550 res = NULL;
551
552 if (PQgetResult(conn) != NULL)
553 pg_fatal("PQgetResult returned something extra after first result");
554
555 if (PQexitPipelineMode(conn) != 0)
556 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
557
558 res = PQgetResult(conn);
559 if (res == NULL)
560 pg_fatal("PQgetResult returned null when sync result expected: %s",
562
564 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
566 PQclear(res);
567
568 /* third pipeline */
569
570 res = PQgetResult(conn);
571 if (res == NULL)
572 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
574
576 pg_fatal("Unexpected result code %s from third pipeline item",
578
579 res = PQgetResult(conn);
580 if (res != NULL)
581 pg_fatal("Expected null result, got %s",
583
584 res = PQgetResult(conn);
585 if (res == NULL)
586 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
588
590 pg_fatal("Unexpected result code %s from second pipeline sync",
592
593 /* We're still in pipeline mode ... */
595 pg_fatal("Fell out of pipeline mode somehow");
596
597 /* until we end it, which we can safely do now */
598 if (PQexitPipelineMode(conn) != 1)
599 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
601
603 pg_fatal("exiting pipeline mode didn't seem to work");
604
605 fprintf(stderr, "ok\n");
606}
607
608/*
609 * Test behavior when a pipeline dispatches a number of commands that are
610 * not flushed by a sync point.
611 */
612static void
614{
615 int numqueries = 10;
616 int results = 0;
617 int sock = PQsocket(conn);
618
619 fprintf(stderr, "nosync... ");
620
621 if (sock < 0)
622 pg_fatal("invalid socket");
623
624 if (PQenterPipelineMode(conn) != 1)
625 pg_fatal("could not enter pipeline mode");
626 for (int i = 0; i < numqueries; i++)
627 {
628 fd_set input_mask;
629 struct timeval tv;
630
631 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
632 0, NULL, NULL, NULL, NULL, 0) != 1)
633 pg_fatal("error sending select: %s", PQerrorMessage(conn));
634 PQflush(conn);
635
636 /*
637 * If the server has written anything to us, read (some of) it now.
638 */
639 FD_ZERO(&input_mask);
640 FD_SET(sock, &input_mask);
641 tv.tv_sec = 0;
642 tv.tv_usec = 0;
643 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
644 {
645 fprintf(stderr, "select() failed: %m\n");
647 }
648 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
649 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
650 }
651
652 /* tell server to flush its output buffer */
653 if (PQsendFlushRequest(conn) != 1)
654 pg_fatal("failed to send flush request");
655 PQflush(conn);
656
657 /* Now read all results */
658 for (;;)
659 {
660 PGresult *res;
661
662 res = PQgetResult(conn);
663
664 /* NULL results are only expected after TUPLES_OK */
665 if (res == NULL)
666 pg_fatal("got unexpected NULL result after %d results", results);
667
668 /* We expect exactly one TUPLES_OK result for each query we sent */
670 {
671 PGresult *res2;
672
673 /* and one NULL result should follow each */
674 res2 = PQgetResult(conn);
675 if (res2 != NULL)
676 pg_fatal("expected NULL, got %s",
678 PQclear(res);
679 results++;
680
681 /* if we're done, we're done */
682 if (results == numqueries)
683 break;
684
685 continue;
686 }
687
688 /* anything else is unexpected */
689 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
690 }
691
692 fprintf(stderr, "ok\n");
693}
694
695/*
696 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
697 * still have to get results for each pipeline item, but the item will just be
698 * a PGRES_PIPELINE_ABORTED code.
699 *
700 * This intentionally doesn't use a transaction to wrap the pipeline. You should
701 * usually use an xact, but in this case we want to observe the effects of each
702 * statement.
703 */
704static void
706{
707 PGresult *res = NULL;
708 const char *dummy_params[1] = {"1"};
709 Oid dummy_param_oids[1] = {INT4OID};
710 int i;
711 int gotrows;
712 bool goterror;
713
714 fprintf(stderr, "aborted pipeline... ");
715
716 res = PQexec(conn, drop_table_sql);
718 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
719
722 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
723
724 /*
725 * Queue up a couple of small pipelines and process each without returning
726 * to command mode first. Make sure the second operation in the first
727 * pipeline ERRORs.
728 */
729 if (PQenterPipelineMode(conn) != 1)
730 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
731
732 dummy_params[0] = "1";
733 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
734 dummy_params, NULL, NULL, 0) != 1)
735 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
736
737 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
738 1, dummy_param_oids, dummy_params,
739 NULL, NULL, 0) != 1)
740 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
741
742 dummy_params[0] = "2";
743 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
744 dummy_params, NULL, NULL, 0) != 1)
745 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
746
747 if (PQpipelineSync(conn) != 1)
748 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
749
750 dummy_params[0] = "3";
751 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
752 dummy_params, NULL, NULL, 0) != 1)
753 pg_fatal("dispatching second-pipeline insert failed: %s",
755
756 if (PQpipelineSync(conn) != 1)
757 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
758
759 /*
760 * OK, start processing the pipeline results.
761 *
762 * We should get a command-ok for the first query, then a fatal error and
763 * a pipeline aborted message for the second insert, a pipeline-end, then
764 * a command-ok and a pipeline-ok for the second pipeline operation.
765 */
766 res = PQgetResult(conn);
767 if (res == NULL)
768 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
770 pg_fatal("Unexpected result status %s: %s",
773 PQclear(res);
774
775 /* NULL result to signal end-of-results for this command */
776 if ((res = PQgetResult(conn)) != NULL)
777 pg_fatal("Expected null result, got %s",
779
780 /* Second query caused error, so we expect an error next */
781 res = PQgetResult(conn);
782 if (res == NULL)
783 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
785 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
787 PQclear(res);
788
789 /* NULL result to signal end-of-results for this command */
790 if ((res = PQgetResult(conn)) != NULL)
791 pg_fatal("Expected null result, got %s",
793
794 /*
795 * pipeline should now be aborted.
796 *
797 * Note that we could still queue more queries at this point if we wanted;
798 * they'd get added to a new third pipeline since we've already sent a
799 * second. The aborted flag relates only to the pipeline being received.
800 */
802 pg_fatal("pipeline should be flagged as aborted but isn't");
803
804 /* third query in pipeline, the second insert */
805 res = PQgetResult(conn);
806 if (res == NULL)
807 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
809 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
811 PQclear(res);
812
813 /* NULL result to signal end-of-results for this command */
814 if ((res = PQgetResult(conn)) != NULL)
815 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
816
818 pg_fatal("pipeline should be flagged as aborted but isn't");
819
820 /* Ensure we're still in pipeline */
822 pg_fatal("Fell out of pipeline mode somehow");
823
824 /*
825 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
826 *
827 * (This is so clients know to start processing results normally again and
828 * can tell the difference between skipped commands and the sync.)
829 */
830 res = PQgetResult(conn);
831 if (res == NULL)
832 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
834 pg_fatal("Unexpected result code from first pipeline sync\n"
835 "Expected PGRES_PIPELINE_SYNC, got %s",
837 PQclear(res);
838
840 pg_fatal("sync should've cleared the aborted flag but didn't");
841
842 /* We're still in pipeline mode... */
844 pg_fatal("Fell out of pipeline mode somehow");
845
846 /* the insert from the second pipeline */
847 res = PQgetResult(conn);
848 if (res == NULL)
849 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
851 pg_fatal("Unexpected result code %s from first item in second pipeline",
853 PQclear(res);
854
855 /* Read the NULL result at the end of the command */
856 if ((res = PQgetResult(conn)) != NULL)
857 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
858
859 /* the second pipeline sync */
860 if ((res = PQgetResult(conn)) == NULL)
861 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
863 pg_fatal("Unexpected result code %s from second pipeline sync",
865 PQclear(res);
866
867 if ((res = PQgetResult(conn)) != NULL)
868 pg_fatal("Expected null result, got %s: %s",
871
872 /* Try to send two queries in one command */
873 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
874 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
875 if (PQpipelineSync(conn) != 1)
876 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
877 goterror = false;
878 while ((res = PQgetResult(conn)) != NULL)
879 {
880 switch (PQresultStatus(res))
881 {
883 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
884 pg_fatal("expected error about multiple commands, got %s",
886 printf("got expected %s", PQerrorMessage(conn));
887 goterror = true;
888 break;
889 default:
890 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
891 break;
892 }
893 }
894 if (!goterror)
895 pg_fatal("did not get cannot-insert-multiple-commands error");
896 res = PQgetResult(conn);
897 if (res == NULL)
898 pg_fatal("got NULL result");
900 pg_fatal("Unexpected result code %s from pipeline sync",
902 fprintf(stderr, "ok\n");
903
904 /* Test single-row mode with an error partways */
905 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
906 0, NULL, NULL, NULL, NULL, 0) != 1)
907 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
908 if (PQpipelineSync(conn) != 1)
909 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
911 goterror = false;
912 gotrows = 0;
913 while ((res = PQgetResult(conn)) != NULL)
914 {
915 switch (PQresultStatus(res))
916 {
918 printf("got row: %s\n", PQgetvalue(res, 0, 0));
919 gotrows++;
920 break;
922 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
923 pg_fatal("expected division-by-zero, got: %s (%s)",
926 printf("got expected division-by-zero\n");
927 goterror = true;
928 break;
929 default:
930 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
931 }
932 PQclear(res);
933 }
934 if (!goterror)
935 pg_fatal("did not get division-by-zero error");
936 if (gotrows != 3)
937 pg_fatal("did not get three rows");
938 /* the third pipeline sync */
939 if ((res = PQgetResult(conn)) == NULL)
940 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
942 pg_fatal("Unexpected result code %s from third pipeline sync",
944 PQclear(res);
945
946 /* We're still in pipeline mode... */
948 pg_fatal("Fell out of pipeline mode somehow");
949
950 /* until we end it, which we can safely do now */
951 if (PQexitPipelineMode(conn) != 1)
952 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
954
956 pg_fatal("exiting pipeline mode didn't seem to work");
957
958 /*-
959 * Since we fired the pipelines off without a surrounding xact, the results
960 * should be:
961 *
962 * - Implicit xact started by server around 1st pipeline
963 * - First insert applied
964 * - Second statement aborted xact
965 * - Third insert skipped
966 * - Sync rolled back first implicit xact
967 * - Implicit xact created by server around 2nd pipeline
968 * - insert applied from 2nd pipeline
969 * - Sync commits 2nd xact
970 *
971 * So we should only have the value 3 that we inserted.
972 */
973 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
974
976 pg_fatal("Expected tuples, got %s: %s",
978 if (PQntuples(res) != 1)
979 pg_fatal("expected 1 result, got %d", PQntuples(res));
980 for (i = 0; i < PQntuples(res); i++)
981 {
982 const char *val = PQgetvalue(res, i, 0);
983
984 if (strcmp(val, "3") != 0)
985 pg_fatal("expected only insert with value 3, got %s", val);
986 }
987
988 PQclear(res);
989
990 fprintf(stderr, "ok\n");
991}
992
993/* State machine enum for test_pipelined_insert */
995{
1004};
1005
1006static void
1008{
1009 Oid insert_param_oids[2] = {INT4OID, INT8OID};
1010 const char *insert_params[2];
1011 char insert_param_0[MAXINTLEN];
1012 char insert_param_1[MAXINT8LEN];
1013 enum PipelineInsertStep send_step = BI_BEGIN_TX,
1014 recv_step = BI_BEGIN_TX;
1015 int rows_to_send,
1016 rows_to_receive;
1017
1018 insert_params[0] = insert_param_0;
1019 insert_params[1] = insert_param_1;
1020
1021 rows_to_send = rows_to_receive = n_rows;
1022
1023 /*
1024 * Do a pipelined insert into a table created at the start of the pipeline
1025 */
1026 if (PQenterPipelineMode(conn) != 1)
1027 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1028
1029 while (send_step != BI_PREPARE)
1030 {
1031 const char *sql;
1032
1033 switch (send_step)
1034 {
1035 case BI_BEGIN_TX:
1036 sql = "BEGIN TRANSACTION";
1037 send_step = BI_DROP_TABLE;
1038 break;
1039
1040 case BI_DROP_TABLE:
1041 sql = drop_table_sql;
1042 send_step = BI_CREATE_TABLE;
1043 break;
1044
1045 case BI_CREATE_TABLE:
1046 sql = create_table_sql;
1047 send_step = BI_PREPARE;
1048 break;
1049
1050 default:
1051 pg_fatal("invalid state");
1052 sql = NULL; /* keep compiler quiet */
1053 }
1054
1055 pg_debug("sending: %s\n", sql);
1056 if (PQsendQueryParams(conn, sql,
1057 0, NULL, NULL, NULL, NULL, 0) != 1)
1058 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1059 }
1060
1061 Assert(send_step == BI_PREPARE);
1062 pg_debug("sending: %s\n", insert_sql2);
1063 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1064 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1065 send_step = BI_INSERT_ROWS;
1066
1067 /*
1068 * Now we start inserting. We'll be sending enough data that we could fill
1069 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1070 * mode and consume input while we send more output. As results of each
1071 * query are processed we should pop them to allow processing of the next
1072 * query. There's no need to finish the pipeline before processing
1073 * results.
1074 */
1075 if (PQsetnonblocking(conn, 1) != 0)
1076 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1077
1078 while (recv_step != BI_DONE)
1079 {
1080 int sock;
1081 fd_set input_mask;
1082 fd_set output_mask;
1083
1084 sock = PQsocket(conn);
1085
1086 if (sock < 0)
1087 break; /* shouldn't happen */
1088
1089 FD_ZERO(&input_mask);
1090 FD_SET(sock, &input_mask);
1091 FD_ZERO(&output_mask);
1092 FD_SET(sock, &output_mask);
1093
1094 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1095 {
1096 fprintf(stderr, "select() failed: %m\n");
1098 }
1099
1100 /*
1101 * Process any results, so we keep the server's output buffer free
1102 * flowing and it can continue to process input
1103 */
1104 if (FD_ISSET(sock, &input_mask))
1105 {
1107
1108 /* Read until we'd block if we tried to read */
1109 while (!PQisBusy(conn) && recv_step < BI_DONE)
1110 {
1111 PGresult *res;
1112 const char *cmdtag = "";
1113 const char *description = "";
1114 int status;
1115
1116 /*
1117 * Read next result. If no more results from this query,
1118 * advance to the next query
1119 */
1120 res = PQgetResult(conn);
1121 if (res == NULL)
1122 continue;
1123
1124 status = PGRES_COMMAND_OK;
1125 switch (recv_step)
1126 {
1127 case BI_BEGIN_TX:
1128 cmdtag = "BEGIN";
1129 recv_step++;
1130 break;
1131 case BI_DROP_TABLE:
1132 cmdtag = "DROP TABLE";
1133 recv_step++;
1134 break;
1135 case BI_CREATE_TABLE:
1136 cmdtag = "CREATE TABLE";
1137 recv_step++;
1138 break;
1139 case BI_PREPARE:
1140 cmdtag = "";
1141 description = "PREPARE";
1142 recv_step++;
1143 break;
1144 case BI_INSERT_ROWS:
1145 cmdtag = "INSERT";
1146 rows_to_receive--;
1147 if (rows_to_receive == 0)
1148 recv_step++;
1149 break;
1150 case BI_COMMIT_TX:
1151 cmdtag = "COMMIT";
1152 recv_step++;
1153 break;
1154 case BI_SYNC:
1155 cmdtag = "";
1156 description = "SYNC";
1157 status = PGRES_PIPELINE_SYNC;
1158 recv_step++;
1159 break;
1160 case BI_DONE:
1161 /* unreachable */
1162 pg_fatal("unreachable state");
1163 }
1164
1165 if (PQresultStatus(res) != status)
1166 pg_fatal("%s reported status %s, expected %s\n"
1167 "Error message: \"%s\"",
1169 PQresStatus(status), PQerrorMessage(conn));
1170
1171 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1172 pg_fatal("%s expected command tag '%s', got '%s'",
1173 description, cmdtag, PQcmdStatus(res));
1174
1175 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1176
1177 PQclear(res);
1178 }
1179 }
1180
1181 /* Write more rows and/or the end pipeline message, if needed */
1182 if (FD_ISSET(sock, &output_mask))
1183 {
1184 PQflush(conn);
1185
1186 if (send_step == BI_INSERT_ROWS)
1187 {
1188 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1189 /* use up some buffer space with a wide value */
1190 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1191
1192 if (PQsendQueryPrepared(conn, "my_insert",
1193 2, insert_params, NULL, NULL, 0) == 1)
1194 {
1195 pg_debug("sent row %d\n", rows_to_send);
1196
1197 rows_to_send--;
1198 if (rows_to_send == 0)
1199 send_step++;
1200 }
1201 else
1202 {
1203 /*
1204 * in nonblocking mode, so it's OK for an insert to fail
1205 * to send
1206 */
1207 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1208 rows_to_send, PQerrorMessage(conn));
1209 }
1210 }
1211 else if (send_step == BI_COMMIT_TX)
1212 {
1213 if (PQsendQueryParams(conn, "COMMIT",
1214 0, NULL, NULL, NULL, NULL, 0) == 1)
1215 {
1216 pg_debug("sent COMMIT\n");
1217 send_step++;
1218 }
1219 else
1220 {
1221 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223 }
1224 }
1225 else if (send_step == BI_SYNC)
1226 {
1227 if (PQpipelineSync(conn) == 1)
1228 {
1229 fprintf(stdout, "pipeline sync sent\n");
1230 send_step++;
1231 }
1232 else
1233 {
1234 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236 }
1237 }
1238 }
1239 }
1240
1241 /* We've got the sync message and the pipeline should be done */
1242 if (PQexitPipelineMode(conn) != 1)
1243 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245
1246 if (PQsetnonblocking(conn, 0) != 0)
1247 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1248
1249 fprintf(stderr, "ok\n");
1250}
1251
1252static void
1254{
1255 PGresult *res = NULL;
1256 Oid param_oids[1] = {INT4OID};
1257 Oid expected_oids[4];
1258 Oid typ;
1259
1260 fprintf(stderr, "prepared... ");
1261
1262 if (PQenterPipelineMode(conn) != 1)
1263 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1264 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1265 "interval '1 sec'",
1266 1, param_oids) != 1)
1267 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1268 expected_oids[0] = INT4OID;
1269 expected_oids[1] = TEXTOID;
1270 expected_oids[2] = NUMERICOID;
1271 expected_oids[3] = INTERVALOID;
1272 if (PQsendDescribePrepared(conn, "select_one") != 1)
1273 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1274 if (PQpipelineSync(conn) != 1)
1275 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1276
1277 res = PQgetResult(conn);
1278 if (res == NULL)
1279 pg_fatal("PQgetResult returned null");
1280 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1281 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1282 PQclear(res);
1283 res = PQgetResult(conn);
1284 if (res != NULL)
1285 pg_fatal("expected NULL result");
1286
1287 res = PQgetResult(conn);
1288 if (res == NULL)
1289 pg_fatal("PQgetResult returned NULL");
1290 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1291 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1292 if (PQnfields(res) != lengthof(expected_oids))
1293 pg_fatal("expected %zu columns, got %d",
1294 lengthof(expected_oids), PQnfields(res));
1295 for (int i = 0; i < PQnfields(res); i++)
1296 {
1297 typ = PQftype(res, i);
1298 if (typ != expected_oids[i])
1299 pg_fatal("field %d: expected type %u, got %u",
1300 i, expected_oids[i], typ);
1301 }
1302 PQclear(res);
1303 res = PQgetResult(conn);
1304 if (res != NULL)
1305 pg_fatal("expected NULL result");
1306
1307 res = PQgetResult(conn);
1309 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1310
1311 fprintf(stderr, "closing statement..");
1312 if (PQsendClosePrepared(conn, "select_one") != 1)
1313 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1314 if (PQpipelineSync(conn) != 1)
1315 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1316
1317 res = PQgetResult(conn);
1318 if (res == NULL)
1319 pg_fatal("expected non-NULL result");
1320 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1321 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1322 PQclear(res);
1323 res = PQgetResult(conn);
1324 if (res != NULL)
1325 pg_fatal("expected NULL result");
1326 res = PQgetResult(conn);
1328 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1329
1330 if (PQexitPipelineMode(conn) != 1)
1331 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1332
1333 /* Now that it's closed we should get an error when describing */
1334 res = PQdescribePrepared(conn, "select_one");
1336 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1337
1338 /*
1339 * Also test the blocking close, this should not fail since closing a
1340 * non-existent prepared statement is a no-op
1341 */
1342 res = PQclosePrepared(conn, "select_one");
1343 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1344 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1345
1346 fprintf(stderr, "creating portal... ");
1347 PQexec(conn, "BEGIN");
1348 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1350 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1351 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1352 if (PQpipelineSync(conn) != 1)
1353 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1354 res = PQgetResult(conn);
1355 if (res == NULL)
1356 pg_fatal("PQgetResult returned null");
1357 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1358 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1359
1360 typ = PQftype(res, 0);
1361 if (typ != INT4OID)
1362 pg_fatal("portal: expected type %u, got %u",
1363 INT4OID, typ);
1364 PQclear(res);
1365 res = PQgetResult(conn);
1366 if (res != NULL)
1367 pg_fatal("expected NULL result");
1368 res = PQgetResult(conn);
1370 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1371
1372 fprintf(stderr, "closing portal... ");
1373 if (PQsendClosePortal(conn, "cursor_one") != 1)
1374 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1375 if (PQpipelineSync(conn) != 1)
1376 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1377
1378 res = PQgetResult(conn);
1379 if (res == NULL)
1380 pg_fatal("expected non-NULL result");
1381 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1382 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1383 PQclear(res);
1384 res = PQgetResult(conn);
1385 if (res != NULL)
1386 pg_fatal("expected NULL result");
1387 res = PQgetResult(conn);
1389 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1390
1391 if (PQexitPipelineMode(conn) != 1)
1392 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1393
1394 /* Now that it's closed we should get an error when describing */
1395 res = PQdescribePortal(conn, "cursor_one");
1397 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1398
1399 /*
1400 * Also test the blocking close, this should not fail since closing a
1401 * non-existent portal is a no-op
1402 */
1403 res = PQclosePortal(conn, "cursor_one");
1404 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1405 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1406
1407 fprintf(stderr, "ok\n");
1408}
1409
1410/*
1411 * Test max_protocol_version options.
1412 */
1413static void
1415{
1416 const char **keywords;
1417 const char **vals;
1418 int nopts;
1420 int protocol_version;
1421 int max_protocol_version_index;
1422 int i;
1423
1424 /*
1425 * Prepare keywords/vals arrays, copied from the existing connection, with
1426 * an extra slot for 'max_protocol_version'.
1427 */
1428 nopts = 0;
1429 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1430 nopts++;
1431 nopts++; /* max_protocol_version */
1432 nopts++; /* NULL terminator */
1433
1434 keywords = pg_malloc0(sizeof(char *) * nopts);
1435 vals = pg_malloc0(sizeof(char *) * nopts);
1436
1437 i = 0;
1438 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1439 {
1440 if (opt->val)
1441 {
1442 keywords[i] = opt->keyword;
1443 vals[i] = opt->val;
1444 i++;
1445 }
1446 }
1447
1448 max_protocol_version_index = i;
1449 keywords[i] = "max_protocol_version"; /* value is filled in below */
1450 i++;
1451 keywords[i] = vals[i] = NULL;
1452
1453 /*
1454 * Test max_protocol_version=3.0
1455 */
1456 vals[max_protocol_version_index] = "3.0";
1457 conn = PQconnectdbParams(keywords, vals, false);
1458
1459 if (PQstatus(conn) != CONNECTION_OK)
1460 pg_fatal("Connection to database failed: %s",
1462
1463 protocol_version = PQfullProtocolVersion(conn);
1464 if (protocol_version != 30000)
1465 pg_fatal("expected 30000, got %d", protocol_version);
1466
1467 PQfinish(conn);
1468
1469 /*
1470 * Test max_protocol_version=3.1. It's not valid, we went straight from
1471 * 3.0 to 3.2.
1472 */
1473 vals[max_protocol_version_index] = "3.1";
1474 conn = PQconnectdbParams(keywords, vals, false);
1475
1477 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1478
1479 PQfinish(conn);
1480
1481 /*
1482 * Test max_protocol_version=3.2
1483 */
1484 vals[max_protocol_version_index] = "3.2";
1485 conn = PQconnectdbParams(keywords, vals, false);
1486
1487 if (PQstatus(conn) != CONNECTION_OK)
1488 pg_fatal("Connection to database failed: %s",
1490
1491 protocol_version = PQfullProtocolVersion(conn);
1492 if (protocol_version != 30002)
1493 pg_fatal("expected 30002, got %d", protocol_version);
1494
1495 PQfinish(conn);
1496
1497 /*
1498 * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1499 */
1500 vals[max_protocol_version_index] = "latest";
1501 conn = PQconnectdbParams(keywords, vals, false);
1502
1503 if (PQstatus(conn) != CONNECTION_OK)
1504 pg_fatal("Connection to database failed: %s",
1506
1507 protocol_version = PQfullProtocolVersion(conn);
1508 if (protocol_version != 30002)
1509 pg_fatal("expected 30002, got %d", protocol_version);
1510
1511 PQfinish(conn);
1512}
1513
1514/* Notice processor: print notices, and count how many we got */
1515static void
1516notice_processor(void *arg, const char *message)
1517{
1518 int *n_notices = (int *) arg;
1519
1520 (*n_notices)++;
1521 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1522}
1523
1524/* Verify behavior in "idle" state */
1525static void
1527{
1528 PGresult *res;
1529 int n_notices = 0;
1530
1531 fprintf(stderr, "\npipeline idle...\n");
1532
1534
1535 /* Try to exit pipeline mode in pipeline-idle state */
1536 if (PQenterPipelineMode(conn) != 1)
1537 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1538 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1539 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1541 res = PQgetResult(conn);
1542 if (res == NULL)
1543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1545 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1546 pg_fatal("unexpected result code %s from first pipeline item",
1548 PQclear(res);
1549 res = PQgetResult(conn);
1550 if (res != NULL)
1551 pg_fatal("did not receive terminating NULL");
1552 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1553 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1554 if (PQexitPipelineMode(conn) == 1)
1555 pg_fatal("exiting pipeline succeeded when it shouldn't");
1556 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1557 strlen("cannot exit pipeline mode")) != 0)
1558 pg_fatal("did not get expected error; got: %s",
1561 res = PQgetResult(conn);
1562 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1563 pg_fatal("unexpected result code %s from second pipeline item",
1565 PQclear(res);
1566 res = PQgetResult(conn);
1567 if (res != NULL)
1568 pg_fatal("did not receive terminating NULL");
1569 if (PQexitPipelineMode(conn) != 1)
1570 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1571
1572 if (n_notices > 0)
1573 pg_fatal("got %d notice(s)", n_notices);
1574 fprintf(stderr, "ok - 1\n");
1575
1576 /* Have a WARNING in the middle of a resultset */
1577 if (PQenterPipelineMode(conn) != 1)
1578 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1579 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1580 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1582 res = PQgetResult(conn);
1583 if (res == NULL)
1584 pg_fatal("unexpected NULL result received");
1585 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1586 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1587 if (PQexitPipelineMode(conn) != 1)
1588 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1589 fprintf(stderr, "ok - 2\n");
1590}
1591
1592static void
1594{
1595 PGresult *res = NULL;
1596 const char *dummy_params[1] = {"1"};
1597 Oid dummy_param_oids[1] = {INT4OID};
1598
1599 fprintf(stderr, "simple pipeline... ");
1600
1601 /*
1602 * Enter pipeline mode and dispatch a set of operations, which we'll then
1603 * process the results of as they come in.
1604 *
1605 * For a simple case we should be able to do this without interim
1606 * processing of results since our output buffer will give us enough slush
1607 * to work with and we won't block on sending. So blocking mode is fine.
1608 */
1609 if (PQisnonblocking(conn))
1610 pg_fatal("Expected blocking connection mode");
1611
1612 if (PQenterPipelineMode(conn) != 1)
1613 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1614
1615 if (PQsendQueryParams(conn, "SELECT $1",
1616 1, dummy_param_oids, dummy_params,
1617 NULL, NULL, 0) != 1)
1618 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1619
1620 if (PQexitPipelineMode(conn) != 0)
1621 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1622
1623 if (PQpipelineSync(conn) != 1)
1624 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1625
1626 res = PQgetResult(conn);
1627 if (res == NULL)
1628 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1630
1631 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1632 pg_fatal("Unexpected result code %s from first pipeline item",
1634
1635 PQclear(res);
1636 res = NULL;
1637
1638 if (PQgetResult(conn) != NULL)
1639 pg_fatal("PQgetResult returned something extra after first query result.");
1640
1641 /*
1642 * Even though we've processed the result there's still a sync to come and
1643 * we can't exit pipeline mode yet
1644 */
1645 if (PQexitPipelineMode(conn) != 0)
1646 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1647
1648 res = PQgetResult(conn);
1649 if (res == NULL)
1650 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1652
1654 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1656
1657 PQclear(res);
1658 res = NULL;
1659
1660 if (PQgetResult(conn) != NULL)
1661 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1663
1664 /* We're still in pipeline mode... */
1666 pg_fatal("Fell out of pipeline mode somehow");
1667
1668 /* ... until we end it, which we can safely do now */
1669 if (PQexitPipelineMode(conn) != 1)
1670 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1672
1674 pg_fatal("Exiting pipeline mode didn't seem to work");
1675
1676 fprintf(stderr, "ok\n");
1677}
1678
1679static void
1681{
1682 PGresult *res;
1683 int i;
1684 bool pipeline_ended = false;
1685
1686 if (PQenterPipelineMode(conn) != 1)
1687 pg_fatal("failed to enter pipeline mode: %s",
1689
1690 /* One series of three commands, using single-row mode for the first two. */
1691 for (i = 0; i < 3; i++)
1692 {
1693 char *param[1];
1694
1695 param[0] = psprintf("%d", 44 + i);
1696
1698 "SELECT generate_series(42, $1)",
1699 1,
1700 NULL,
1701 (const char **) param,
1702 NULL,
1703 NULL,
1704 0) != 1)
1705 pg_fatal("failed to send query: %s",
1707 pfree(param[0]);
1708 }
1709 if (PQpipelineSync(conn) != 1)
1710 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1711
1712 for (i = 0; !pipeline_ended; i++)
1713 {
1714 bool first = true;
1715 bool saw_ending_tuplesok;
1716 bool isSingleTuple = false;
1717
1718 /* Set single row mode for only first 2 SELECT queries */
1719 if (i < 2)
1720 {
1721 if (PQsetSingleRowMode(conn) != 1)
1722 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1723 }
1724
1725 /* Consume rows for this query */
1726 saw_ending_tuplesok = false;
1727 while ((res = PQgetResult(conn)) != NULL)
1728 {
1729 ExecStatusType est = PQresultStatus(res);
1730
1731 if (est == PGRES_PIPELINE_SYNC)
1732 {
1733 fprintf(stderr, "end of pipeline reached\n");
1734 pipeline_ended = true;
1735 PQclear(res);
1736 if (i != 3)
1737 pg_fatal("Expected three results, got %d", i);
1738 break;
1739 }
1740
1741 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1742 if (first)
1743 {
1744 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1745 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1746 i, PQresStatus(est));
1747 if (i >= 2 && est != PGRES_TUPLES_OK)
1748 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1749 i, PQresStatus(est));
1750 first = false;
1751 }
1752
1753 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1754 switch (est)
1755 {
1756 case PGRES_TUPLES_OK:
1757 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1758 saw_ending_tuplesok = true;
1759 if (isSingleTuple)
1760 {
1761 if (PQntuples(res) == 0)
1762 fprintf(stderr, "all tuples received in query %d\n", i);
1763 else
1764 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1765 }
1766 break;
1767
1768 case PGRES_SINGLE_TUPLE:
1769 isSingleTuple = true;
1770 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1771 break;
1772
1773 default:
1774 pg_fatal("unexpected");
1775 }
1776 PQclear(res);
1777 }
1778 if (!pipeline_ended && !saw_ending_tuplesok)
1779 pg_fatal("didn't get expected terminating TUPLES_OK");
1780 }
1781
1782 /*
1783 * Now issue one command, get its results in with single-row mode, then
1784 * issue another command, and get its results in normal mode; make sure
1785 * the single-row mode flag is reset as expected.
1786 */
1787 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1788 0, NULL, NULL, NULL, NULL, 0) != 1)
1789 pg_fatal("failed to send query: %s",
1791 if (PQsendFlushRequest(conn) != 1)
1792 pg_fatal("failed to send flush request");
1793 if (PQsetSingleRowMode(conn) != 1)
1794 pg_fatal("PQsetSingleRowMode() failed");
1795 res = PQgetResult(conn);
1796 if (res == NULL)
1797 pg_fatal("unexpected NULL");
1799 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1801 res = PQgetResult(conn);
1802 if (res == NULL)
1803 pg_fatal("unexpected NULL");
1804 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1805 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1807 if (PQgetResult(conn) != NULL)
1808 pg_fatal("expected NULL result");
1809
1810 if (PQsendQueryParams(conn, "SELECT 1",
1811 0, NULL, NULL, NULL, NULL, 0) != 1)
1812 pg_fatal("failed to send query: %s",
1814 if (PQsendFlushRequest(conn) != 1)
1815 pg_fatal("failed to send flush request");
1816 res = PQgetResult(conn);
1817 if (res == NULL)
1818 pg_fatal("unexpected NULL");
1819 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1820 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1822 if (PQgetResult(conn) != NULL)
1823 pg_fatal("expected NULL result");
1824
1825 /*
1826 * Try chunked mode as well; make sure that it correctly delivers a
1827 * partial final chunk.
1828 */
1829 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1830 0, NULL, NULL, NULL, NULL, 0) != 1)
1831 pg_fatal("failed to send query: %s",
1833 if (PQsendFlushRequest(conn) != 1)
1834 pg_fatal("failed to send flush request");
1835 if (PQsetChunkedRowsMode(conn, 3) != 1)
1836 pg_fatal("PQsetChunkedRowsMode() failed");
1837 res = PQgetResult(conn);
1838 if (res == NULL)
1839 pg_fatal("unexpected NULL");
1841 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1844 if (PQntuples(res) != 3)
1845 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1846 res = PQgetResult(conn);
1847 if (res == NULL)
1848 pg_fatal("unexpected NULL");
1850 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1852 if (PQntuples(res) != 2)
1853 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1854 res = PQgetResult(conn);
1855 if (res == NULL)
1856 pg_fatal("unexpected NULL");
1857 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1858 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1860 if (PQntuples(res) != 0)
1861 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1862 if (PQgetResult(conn) != NULL)
1863 pg_fatal("expected NULL result");
1864
1865 if (PQexitPipelineMode(conn) != 1)
1866 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1867
1868 fprintf(stderr, "ok\n");
1869}
1870
1871/*
1872 * Simple test to verify that a pipeline is discarded as a whole when there's
1873 * an error, ignoring transaction commands.
1874 */
1875static void
1877{
1878 PGresult *res;
1879 bool expect_null;
1880 int num_syncs = 0;
1881
1882 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1883 "CREATE TABLE pq_pipeline_tst (id int)");
1884 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1885 pg_fatal("failed to create test table: %s",
1887 PQclear(res);
1888
1889 if (PQenterPipelineMode(conn) != 1)
1890 pg_fatal("failed to enter pipeline mode: %s",
1892 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1893 pg_fatal("could not send prepare on pipeline: %s",
1895
1897 "BEGIN",
1898 0, NULL, NULL, NULL, NULL, 0) != 1)
1899 pg_fatal("failed to send query: %s",
1902 "SELECT 0/0",
1903 0, NULL, NULL, NULL, NULL, 0) != 1)
1904 pg_fatal("failed to send query: %s",
1906
1907 /*
1908 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1909 * get out of the pipeline-aborted state first.
1910 */
1911 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1912 pg_fatal("failed to execute prepared: %s",
1914
1915 /* This insert fails because we're in pipeline-aborted state */
1917 "INSERT INTO pq_pipeline_tst VALUES (1)",
1918 0, NULL, NULL, NULL, NULL, 0) != 1)
1919 pg_fatal("failed to send query: %s",
1921 if (PQpipelineSync(conn) != 1)
1922 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1923 num_syncs++;
1924
1925 /*
1926 * This insert fails even though the pipeline got a SYNC, because we're in
1927 * an aborted transaction
1928 */
1930 "INSERT INTO pq_pipeline_tst VALUES (2)",
1931 0, NULL, NULL, NULL, NULL, 0) != 1)
1932 pg_fatal("failed to send query: %s",
1934 if (PQpipelineSync(conn) != 1)
1935 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1936 num_syncs++;
1937
1938 /*
1939 * Send ROLLBACK using prepared stmt. This one works because we just did
1940 * PQpipelineSync above.
1941 */
1942 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1943 pg_fatal("failed to execute prepared: %s",
1945
1946 /*
1947 * Now that we're out of a transaction and in pipeline-good mode, this
1948 * insert works
1949 */
1951 "INSERT INTO pq_pipeline_tst VALUES (3)",
1952 0, NULL, NULL, NULL, NULL, 0) != 1)
1953 pg_fatal("failed to send query: %s",
1955 /* Send two syncs now -- match up to SYNC messages below */
1956 if (PQpipelineSync(conn) != 1)
1957 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1958 num_syncs++;
1959 if (PQpipelineSync(conn) != 1)
1960 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1961 num_syncs++;
1962
1963 expect_null = false;
1964 for (int i = 0;; i++)
1965 {
1966 ExecStatusType restype;
1967
1968 res = PQgetResult(conn);
1969 if (res == NULL)
1970 {
1971 printf("%d: got NULL result\n", i);
1972 if (!expect_null)
1973 pg_fatal("did not expect NULL here");
1974 expect_null = false;
1975 continue;
1976 }
1977 restype = PQresultStatus(res);
1978 printf("%d: got status %s", i, PQresStatus(restype));
1979 if (expect_null)
1980 pg_fatal("expected NULL");
1981 if (restype == PGRES_FATAL_ERROR)
1982 printf("; error: %s", PQerrorMessage(conn));
1983 else if (restype == PGRES_PIPELINE_ABORTED)
1984 {
1985 printf(": command didn't run because pipeline aborted\n");
1986 }
1987 else
1988 printf("\n");
1989 PQclear(res);
1990
1991 if (restype == PGRES_PIPELINE_SYNC)
1992 num_syncs--;
1993 else
1994 expect_null = true;
1995 if (num_syncs <= 0)
1996 break;
1997 }
1998 if (PQgetResult(conn) != NULL)
1999 pg_fatal("returned something extra after all the syncs: %s",
2001
2002 if (PQexitPipelineMode(conn) != 1)
2003 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
2004
2005 /* We expect to find one tuple containing the value "3" */
2006 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
2007 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2008 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
2009 if (PQntuples(res) != 1)
2010 pg_fatal("did not get 1 tuple");
2011 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
2012 pg_fatal("did not get expected tuple");
2013 PQclear(res);
2014
2015 fprintf(stderr, "ok\n");
2016}
2017
2018/*
2019 * In this test mode we send a stream of queries, with one in the middle
2020 * causing an error. Verify that we can still send some more after the
2021 * error and have libpq work properly.
2022 */
2023static void
2025{
2026 int sock = PQsocket(conn);
2027 PGresult *res;
2028 Oid paramTypes[2] = {INT8OID, INT8OID};
2029 const char *paramValues[2];
2030 char paramValue0[MAXINT8LEN];
2031 char paramValue1[MAXINT8LEN];
2032 int ctr = 0;
2033 int numsent = 0;
2034 int results = 0;
2035 bool read_done = false;
2036 bool write_done = false;
2037 bool error_sent = false;
2038 bool got_error = false;
2039 int switched = 0;
2040 int socketful = 0;
2041 fd_set in_fds;
2042 fd_set out_fds;
2043
2044 fprintf(stderr, "uniqviol ...");
2045
2047
2048 paramValues[0] = paramValue0;
2049 paramValues[1] = paramValue1;
2050 sprintf(paramValue1, "42");
2051
2052 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
2053 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
2054 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2055 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
2056
2057 res = PQexec(conn, "begin");
2058 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2059 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
2060
2061 res = PQprepare(conn, "insertion",
2062 "insert into ppln_uniqviol values ($1, $2) returning id",
2063 2, paramTypes);
2064 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
2065 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
2066
2067 if (PQenterPipelineMode(conn) != 1)
2068 pg_fatal("failed to enter pipeline mode");
2069
2070 while (!read_done)
2071 {
2072 /*
2073 * Avoid deadlocks by reading everything the server has sent before
2074 * sending anything. (Special precaution is needed here to process
2075 * PQisBusy before testing the socket for read-readiness, because the
2076 * socket does not turn read-ready after "sending" queries in aborted
2077 * pipeline mode.)
2078 */
2079 while (PQisBusy(conn) == 0)
2080 {
2081 bool new_error;
2082
2083 if (results >= numsent)
2084 {
2085 if (write_done)
2086 read_done = true;
2087 break;
2088 }
2089
2090 res = PQgetResult(conn);
2091 new_error = process_result(conn, res, results, numsent);
2092 if (new_error && got_error)
2093 pg_fatal("got two errors");
2094 got_error |= new_error;
2095 if (results++ >= numsent - 1)
2096 {
2097 if (write_done)
2098 read_done = true;
2099 break;
2100 }
2101 }
2102
2103 if (read_done)
2104 break;
2105
2106 FD_ZERO(&out_fds);
2107 FD_SET(sock, &out_fds);
2108
2109 FD_ZERO(&in_fds);
2110 FD_SET(sock, &in_fds);
2111
2112 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2113 {
2114 if (errno == EINTR)
2115 continue;
2116 pg_fatal("select() failed: %m");
2117 }
2118
2119 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2120 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2121
2122 /*
2123 * If the socket is writable and we haven't finished sending queries,
2124 * send some.
2125 */
2126 if (!write_done && FD_ISSET(sock, &out_fds))
2127 {
2128 for (;;)
2129 {
2130 int flush;
2131
2132 /*
2133 * provoke uniqueness violation exactly once after having
2134 * switched to read mode.
2135 */
2136 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2137 {
2138 sprintf(paramValue0, "%d", numsent / 2);
2139 fprintf(stderr, "E");
2140 error_sent = true;
2141 }
2142 else
2143 {
2144 fprintf(stderr, ".");
2145 sprintf(paramValue0, "%d", ctr++);
2146 }
2147
2148 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2149 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2150 numsent++;
2151
2152 /* Are we done writing? */
2153 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2154 {
2155 if (PQsendFlushRequest(conn) != 1)
2156 pg_fatal("failed to send flush request");
2157 write_done = true;
2158 fprintf(stderr, "\ndone writing\n");
2159 PQflush(conn);
2160 break;
2161 }
2162
2163 /* is the outgoing socket full? */
2164 flush = PQflush(conn);
2165 if (flush == -1)
2166 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2167 if (flush == 1)
2168 {
2169 if (socketful == 0)
2170 socketful = numsent;
2171 fprintf(stderr, "\nswitch to reading\n");
2172 switched++;
2173 break;
2174 }
2175 }
2176 }
2177 }
2178
2179 if (!got_error)
2180 pg_fatal("did not get expected error");
2181
2182 fprintf(stderr, "ok\n");
2183}
2184
2185/*
2186 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2187 * the expected NULL that should follow it.
2188 *
2189 * Returns true if we read a fatal error message, otherwise false.
2190 */
2191static bool
2192process_result(PGconn *conn, PGresult *res, int results, int numsent)
2193{
2194 PGresult *res2;
2195 bool got_error = false;
2196
2197 if (res == NULL)
2198 pg_fatal("got unexpected NULL");
2199
2200 switch (PQresultStatus(res))
2201 {
2202 case PGRES_FATAL_ERROR:
2203 got_error = true;
2204 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2205 PQclear(res);
2206
2207 res2 = PQgetResult(conn);
2208 if (res2 != NULL)
2209 pg_fatal("expected NULL, got %s",
2211 break;
2212
2213 case PGRES_TUPLES_OK:
2214 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2215 PQclear(res);
2216
2217 res2 = PQgetResult(conn);
2218 if (res2 != NULL)
2219 pg_fatal("expected NULL, got %s",
2221 break;
2222
2224 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2225 res2 = PQgetResult(conn);
2226 if (res2 != NULL)
2227 pg_fatal("expected NULL, got %s",
2229 break;
2230
2231 default:
2232 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2233 }
2234
2235 return got_error;
2236}
2237
2238
2239static void
2240usage(const char *progname)
2241{
2242 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2243 fprintf(stderr, "Usage:\n");
2244 fprintf(stderr, " %s [OPTION] tests\n", progname);
2245 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2246 fprintf(stderr, "\nOptions:\n");
2247 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2248 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2249}
2250
2251static void
2253{
2254 printf("cancel\n");
2255 printf("disallowed_in_pipeline\n");
2256 printf("multi_pipelines\n");
2257 printf("nosync\n");
2258 printf("pipeline_abort\n");
2259 printf("pipeline_idle\n");
2260 printf("pipelined_insert\n");
2261 printf("prepared\n");
2262 printf("protocol_version\n");
2263 printf("simple_pipeline\n");
2264 printf("singlerow\n");
2265 printf("transaction\n");
2266 printf("uniqviol\n");
2267}
2268
2269int
2270main(int argc, char **argv)
2271{
2272 const char *conninfo = "";
2273 PGconn *conn;
2274 FILE *trace;
2275 char *testname;
2276 int numrows = 10000;
2277 PGresult *res;
2278 int c;
2279
2280 while ((c = getopt(argc, argv, "r:t:")) != -1)
2281 {
2282 switch (c)
2283 {
2284 case 'r': /* numrows */
2285 errno = 0;
2286 numrows = strtol(optarg, NULL, 10);
2287 if (errno != 0 || numrows <= 0)
2288 {
2289 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2290 optarg);
2291 exit(1);
2292 }
2293 break;
2294 case 't': /* trace file */
2296 break;
2297 }
2298 }
2299
2300 if (optind < argc)
2301 {
2302 testname = pg_strdup(argv[optind]);
2303 optind++;
2304 }
2305 else
2306 {
2307 usage(argv[0]);
2308 exit(1);
2309 }
2310
2311 if (strcmp(testname, "tests") == 0)
2312 {
2314 exit(0);
2315 }
2316
2317 if (optind < argc)
2318 {
2319 conninfo = pg_strdup(argv[optind]);
2320 optind++;
2321 }
2322
2323 /* Make a connection to the database */
2324 conn = PQconnectdb(conninfo);
2325 if (PQstatus(conn) != CONNECTION_OK)
2326 {
2327 fprintf(stderr, "Connection to database failed: %s\n",
2330 }
2331
2332 res = PQexec(conn, "SET lc_messages TO \"C\"");
2333 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2334 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2335 res = PQexec(conn, "SET debug_parallel_query = off");
2336 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2337 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2338
2339 /* Set the trace file, if requested */
2340 if (tracefile != NULL)
2341 {
2342 if (strcmp(tracefile, "-") == 0)
2343 trace = stdout;
2344 else
2345 trace = fopen(tracefile, "w");
2346 if (trace == NULL)
2347 pg_fatal("could not open file \"%s\": %m", tracefile);
2348
2349 /* Make it line-buffered */
2350 setvbuf(trace, NULL, PG_IOLBF, 0);
2351
2352 PQtrace(conn, trace);
2355 }
2356
2357 if (strcmp(testname, "cancel") == 0)
2359 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2361 else if (strcmp(testname, "multi_pipelines") == 0)
2363 else if (strcmp(testname, "nosync") == 0)
2365 else if (strcmp(testname, "pipeline_abort") == 0)
2367 else if (strcmp(testname, "pipeline_idle") == 0)
2369 else if (strcmp(testname, "pipelined_insert") == 0)
2370 test_pipelined_insert(conn, numrows);
2371 else if (strcmp(testname, "prepared") == 0)
2373 else if (strcmp(testname, "protocol_version") == 0)
2375 else if (strcmp(testname, "simple_pipeline") == 0)
2377 else if (strcmp(testname, "singlerow") == 0)
2379 else if (strcmp(testname, "transaction") == 0)
2381 else if (strcmp(testname, "uniqviol") == 0)
2383 else
2384 {
2385 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2386 exit(1);
2387 }
2388
2389 /* close the connection to the database and cleanup */
2390 PQfinish(conn);
2391 return 0;
2392}
#define pg_noreturn
Definition: c.h:165
#define pg_attribute_printf(f, a)
Definition: c.h:233
#define lengthof(array)
Definition: c.h:759
static PGcancel *volatile cancelConn
Definition: cancel.c:43
#define fprintf(file, fmt, msg)
Definition: cubescan.l:21
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:367
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:336
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:68
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:301
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:189
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:530
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:225
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:352
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:725
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:484
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:312
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:324
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:203
int PQfullProtocolVersion(const PGconn *conn)
Definition: fe-connect.c:7599
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:813
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7390
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7556
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5290
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7655
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7663
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7800
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7645
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:758
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2455
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3073
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2569
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3042
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2521
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2539
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2556
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3282
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2491
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3944
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1536
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2474
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1965
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3272
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2504
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3419
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3371
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3983
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
Assert(PointerIsAligned(start, uint64))
long val
Definition: informix.c:689
static struct @165 value
int i
Definition: isn.c:77
static const JsonPathKeyword keywords[]
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ CONNECTION_OK
Definition: libpq-fe.h:84
ExecStatusType
Definition: libpq-fe.h:123
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:142
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:136
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:138
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:139
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:140
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:482
PostgresPollingStatusType
Definition: libpq-fe.h:114
@ PGRES_POLLING_OK
Definition: libpq-fe.h:118
@ PGRES_POLLING_READING
Definition: libpq-fe.h:116
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:117
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:187
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:189
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:484
static void usage(const char *progname)
#define MAXINT8LEN
static void print_test_list(void)
static const char *const insert_sql2
static void confirm_query_canceled_impl(int line, PGconn *conn)
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
#define confirm_query_canceled(conn)
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static pg_noreturn void static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void test_protocol_version(PGconn *conn)
static void test_nosync(PGconn *conn)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static pg_noreturn void pg_fatal_impl(int line, const char *fmt,...) pg_attribute_printf(2
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
Definition: mcxt.c:2150
static AmcheckOptions opts
Definition: pg_amcheck.c:112
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:51
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:72
PGDLLIMPORT char * optarg
Definition: getopt.c:53
#define PG_IOLBF
Definition: port.h:389
#define sprintf
Definition: port.h:241
#define vfprintf
Definition: port.h:242
#define snprintf
Definition: port.h:239
#define printf(...)
Definition: port.h:245
unsigned int Oid
Definition: postgres_ext.h:30
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:52
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void pg_usleep(long microsec)
Definition: signal.c:53
PGconn * conn
Definition: streamutil.c:52
const char * keyword
Definition: regguts.h:323
const char * description
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503