PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
libpq_pipeline.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
5 *
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <sys/select.h>
19#include <sys/time.h>
20
21#include "catalog/pg_type_d.h"
22#include "libpq-fe.h"
23#include "pg_getopt.h"
24
25
26static void exit_nicely(PGconn *conn);
27static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
29static bool process_result(PGconn *conn, PGresult *res, int results,
30 int numsent);
31
32static const char *const progname = "libpq_pipeline";
33
34/* Options and defaults */
35static char *tracefile = NULL; /* path to PQtrace() file */
36
37
38#ifdef DEBUG_OUTPUT
39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40#else
41#define pg_debug(...)
42#endif
43
44static const char *const drop_table_sql =
45"DROP TABLE IF EXISTS pq_pipeline_demo";
46static const char *const create_table_sql =
47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48"int8filler int8);";
49static const char *const insert_sql =
50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51static const char *const insert_sql2 =
52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53
54/* max char length of an int32/64, plus sign and null terminator */
55#define MAXINTLEN 12
56#define MAXINT8LEN 20
57
58static void
60{
62 exit(1);
63}
64
65/*
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
68 */
69
70/*
71 * Print an error to stderr and terminate the program.
72 */
73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74static void
76pg_fatal_impl(int line, const char *fmt,...)
77{
78 va_list args;
79
81
82 fprintf(stderr, "\n%s:%d: ", progname, line);
84 vfprintf(stderr, fmt, args);
86 Assert(fmt[strlen(fmt) - 1] != '\n');
87 fprintf(stderr, "\n");
88 exit(1);
89}
90
91/*
92 * Check that the query on the given connection got canceled.
93 */
94#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
95static void
97{
98 PGresult *res = NULL;
99
101 if (res == NULL)
102 pg_fatal_impl(line, "PQgetResult returned null: %s",
105 pg_fatal_impl(line, "query did not fail when it was expected");
106 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
107 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
109 PQclear(res);
110
111 while (PQisBusy(conn))
113}
114
115/*
116 * Using monitorConn, query pg_stat_activity to see that the connection with
117 * the given PID is either in the given state, or waiting on the given event
118 * (only one of them can be given).
119 */
120static void
121wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
122 char *state, char *event)
123{
124 const Oid paramTypes[] = {INT4OID, TEXTOID};
125 const char *paramValues[2];
126 char *pidstr = psprintf("%d", procpid);
127
128 Assert((state == NULL) ^ (event == NULL));
129
130 paramValues[0] = pidstr;
131 paramValues[1] = state ? state : event;
132
133 while (true)
134 {
135 PGresult *res;
136 char *value;
137
138 if (state != NULL)
139 res = PQexecParams(monitorConn,
140 "SELECT count(*) FROM pg_stat_activity WHERE "
141 "pid = $1 AND state = $2",
142 2, paramTypes, paramValues, NULL, NULL, 0);
143 else
144 res = PQexecParams(monitorConn,
145 "SELECT count(*) FROM pg_stat_activity WHERE "
146 "pid = $1 AND wait_event = $2",
147 2, paramTypes, paramValues, NULL, NULL, 0);
148
150 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
151 if (PQntuples(res) != 1)
152 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
153 if (PQnfields(res) != 1)
154 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
155 value = PQgetvalue(res, 0, 0);
156 if (strcmp(value, "0") != 0)
157 {
158 PQclear(res);
159 break;
160 }
161 PQclear(res);
162
163 /* wait 10ms before polling again */
164 pg_usleep(10000);
165 }
166
167 pfree(pidstr);
168}
169
170#define send_cancellable_query(conn, monitorConn) \
171 send_cancellable_query_impl(__LINE__, conn, monitorConn)
172static void
174{
175 const char *env_wait;
176 const Oid paramTypes[1] = {INT4OID};
177
178 /*
179 * Wait for the connection to be idle, so that our check for an active
180 * connection below is reliable, instead of possibly seeing an outdated
181 * state.
182 */
183 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
184
185 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
186 if (env_wait == NULL)
187 env_wait = "180";
188
189 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
190 &env_wait, NULL, NULL, 0) != 1)
191 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
192
193 /*
194 * Wait for the sleep to be active, because if the query is not running
195 * yet, the cancel request that we send won't have any effect.
196 */
197 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
198}
199
200/*
201 * Create a new connection with the same conninfo as the given one.
202 */
203static PGconn *
205{
206 PGconn *copyConn;
208 const char **keywords;
209 const char **vals;
210 int nopts = 1;
211 int i = 0;
212
213 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
214 nopts++;
215
216 keywords = pg_malloc(sizeof(char *) * nopts);
217 vals = pg_malloc(sizeof(char *) * nopts);
218
219 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
220 {
221 if (opt->val)
222 {
223 keywords[i] = opt->keyword;
224 vals[i] = opt->val;
225 i++;
226 }
227 }
228 keywords[i] = vals[i] = NULL;
229
230 copyConn = PQconnectdbParams(keywords, vals, false);
231
232 if (PQstatus(copyConn) != CONNECTION_OK)
233 pg_fatal("Connection to database failed: %s",
234 PQerrorMessage(copyConn));
235
236 return copyConn;
237}
238
239/*
240 * Test query cancellation routines
241 */
242static void
244{
245 PGcancel *cancel;
247 PGconn *monitorConn;
248 char errorbuf[256];
249
250 fprintf(stderr, "test cancellations... ");
251
252 if (PQsetnonblocking(conn, 1) != 0)
253 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
254
255 /*
256 * Make a separate connection to the database to monitor the query on the
257 * main connection.
258 */
259 monitorConn = copy_connection(conn);
260 Assert(PQstatus(monitorConn) == CONNECTION_OK);
261
262 /* test PQcancel */
263 send_cancellable_query(conn, monitorConn);
264 cancel = PQgetCancel(conn);
265 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
266 pg_fatal("failed to run PQcancel: %s", errorbuf);
268
269 /* PGcancel object can be reused for the next query */
270 send_cancellable_query(conn, monitorConn);
271 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
272 pg_fatal("failed to run PQcancel: %s", errorbuf);
274
275 PQfreeCancel(cancel);
276
277 /* test PQrequestCancel */
278 send_cancellable_query(conn, monitorConn);
279 if (!PQrequestCancel(conn))
280 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
282
283 /* test PQcancelBlocking */
284 send_cancellable_query(conn, monitorConn);
287 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
290
291 /* test PQcancelCreate and then polling with PQcancelPoll */
292 send_cancellable_query(conn, monitorConn);
295 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
296 while (true)
297 {
298 struct timeval tv;
299 fd_set input_mask;
300 fd_set output_mask;
302 int sock = PQcancelSocket(cancelConn);
303
304 if (pollres == PGRES_POLLING_OK)
305 break;
306
307 FD_ZERO(&input_mask);
308 FD_ZERO(&output_mask);
309 switch (pollres)
310 {
312 pg_debug("polling for reads\n");
313 FD_SET(sock, &input_mask);
314 break;
316 pg_debug("polling for writes\n");
317 FD_SET(sock, &output_mask);
318 break;
319 default:
320 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
321 }
322
323 if (sock < 0)
324 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
325
326 tv.tv_sec = 3;
327 tv.tv_usec = 0;
328
329 while (true)
330 {
331 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
332 {
333 if (errno == EINTR)
334 continue;
335 pg_fatal("select() failed: %m");
336 }
337 break;
338 }
339 }
341 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
343
344 /*
345 * test PQcancelReset works on the cancel connection and it can be reused
346 * afterwards
347 */
349
350 send_cancellable_query(conn, monitorConn);
352 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
353 while (true)
354 {
355 struct timeval tv;
356 fd_set input_mask;
357 fd_set output_mask;
359 int sock = PQcancelSocket(cancelConn);
360
361 if (pollres == PGRES_POLLING_OK)
362 break;
363
364 FD_ZERO(&input_mask);
365 FD_ZERO(&output_mask);
366 switch (pollres)
367 {
369 pg_debug("polling for reads\n");
370 FD_SET(sock, &input_mask);
371 break;
373 pg_debug("polling for writes\n");
374 FD_SET(sock, &output_mask);
375 break;
376 default:
377 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
378 }
379
380 if (sock < 0)
381 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
382
383 tv.tv_sec = 3;
384 tv.tv_usec = 0;
385
386 while (true)
387 {
388 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
389 {
390 if (errno == EINTR)
391 continue;
392 pg_fatal("select() failed: %m");
393 }
394 break;
395 }
396 }
398 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
400
402
403 fprintf(stderr, "ok\n");
404}
405
406static void
408{
409 PGresult *res = NULL;
410
411 fprintf(stderr, "test error cases... ");
412
414 pg_fatal("Expected blocking connection mode");
415
416 if (PQenterPipelineMode(conn) != 1)
417 pg_fatal("Unable to enter pipeline mode");
418
420 pg_fatal("Pipeline mode not activated properly");
421
422 /* PQexec should fail in pipeline mode */
423 res = PQexec(conn, "SELECT 1");
425 pg_fatal("PQexec should fail in pipeline mode but succeeded");
426 if (strcmp(PQerrorMessage(conn),
427 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
428 pg_fatal("did not get expected error message; got: \"%s\"",
430
431 /* PQsendQuery should fail in pipeline mode */
432 if (PQsendQuery(conn, "SELECT 1") != 0)
433 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
434 if (strcmp(PQerrorMessage(conn),
435 "PQsendQuery not allowed in pipeline mode\n") != 0)
436 pg_fatal("did not get expected error message; got: \"%s\"",
438
439 /* Entering pipeline mode when already in pipeline mode is OK */
440 if (PQenterPipelineMode(conn) != 1)
441 pg_fatal("re-entering pipeline mode should be a no-op but failed");
442
443 if (PQisBusy(conn) != 0)
444 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
445
446 /* ok, back to normal command mode */
447 if (PQexitPipelineMode(conn) != 1)
448 pg_fatal("couldn't exit idle empty pipeline mode");
449
451 pg_fatal("Pipeline mode not terminated properly");
452
453 /* exiting pipeline mode when not in pipeline mode should be a no-op */
454 if (PQexitPipelineMode(conn) != 1)
455 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
456
457 /* can now PQexec again */
458 res = PQexec(conn, "SELECT 1");
460 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
462
463 fprintf(stderr, "ok\n");
464}
465
466static void
468{
469 PGresult *res = NULL;
470 const char *dummy_params[1] = {"1"};
471 Oid dummy_param_oids[1] = {INT4OID};
472
473 fprintf(stderr, "multi pipeline... ");
474
475 /*
476 * Queue up a couple of small pipelines and process each without returning
477 * to command mode first.
478 */
479 if (PQenterPipelineMode(conn) != 1)
480 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
481
482 /* first pipeline */
483 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
484 dummy_params, NULL, NULL, 0) != 1)
485 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
486
487 if (PQpipelineSync(conn) != 1)
488 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
489
490 /* second pipeline */
491 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
492 dummy_params, NULL, NULL, 0) != 1)
493 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
494
495 /* Skip flushing once. */
496 if (PQsendPipelineSync(conn) != 1)
497 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
498
499 /* third pipeline */
500 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
501 dummy_params, NULL, NULL, 0) != 1)
502 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
503
504 if (PQpipelineSync(conn) != 1)
505 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
506
507 /* OK, start processing the results */
508
509 /* first pipeline */
510
512 if (res == NULL)
513 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
515
517 pg_fatal("Unexpected result code %s from first pipeline item",
519 PQclear(res);
520 res = NULL;
521
522 if (PQgetResult(conn) != NULL)
523 pg_fatal("PQgetResult returned something extra after first result");
524
525 if (PQexitPipelineMode(conn) != 0)
526 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
527
529 if (res == NULL)
530 pg_fatal("PQgetResult returned null when sync result expected: %s",
532
534 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
536 PQclear(res);
537
538 /* second pipeline */
539
541 if (res == NULL)
542 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
544
546 pg_fatal("Unexpected result code %s from second pipeline item",
548 PQclear(res);
549 res = NULL;
550
551 if (PQgetResult(conn) != NULL)
552 pg_fatal("PQgetResult returned something extra after first result");
553
554 if (PQexitPipelineMode(conn) != 0)
555 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
556
558 if (res == NULL)
559 pg_fatal("PQgetResult returned null when sync result expected: %s",
561
563 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
565 PQclear(res);
566
567 /* third pipeline */
568
570 if (res == NULL)
571 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
573
575 pg_fatal("Unexpected result code %s from third pipeline item",
577
579 if (res != NULL)
580 pg_fatal("Expected null result, got %s",
582
584 if (res == NULL)
585 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
587
589 pg_fatal("Unexpected result code %s from second pipeline sync",
591
592 /* We're still in pipeline mode ... */
594 pg_fatal("Fell out of pipeline mode somehow");
595
596 /* until we end it, which we can safely do now */
597 if (PQexitPipelineMode(conn) != 1)
598 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
600
602 pg_fatal("exiting pipeline mode didn't seem to work");
603
604 fprintf(stderr, "ok\n");
605}
606
607/*
608 * Test behavior when a pipeline dispatches a number of commands that are
609 * not flushed by a sync point.
610 */
611static void
613{
614 int numqueries = 10;
615 int results = 0;
616 int sock = PQsocket(conn);
617
618 fprintf(stderr, "nosync... ");
619
620 if (sock < 0)
621 pg_fatal("invalid socket");
622
623 if (PQenterPipelineMode(conn) != 1)
624 pg_fatal("could not enter pipeline mode");
625 for (int i = 0; i < numqueries; i++)
626 {
627 fd_set input_mask;
628 struct timeval tv;
629
630 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
631 0, NULL, NULL, NULL, NULL, 0) != 1)
632 pg_fatal("error sending select: %s", PQerrorMessage(conn));
633 PQflush(conn);
634
635 /*
636 * If the server has written anything to us, read (some of) it now.
637 */
638 FD_ZERO(&input_mask);
639 FD_SET(sock, &input_mask);
640 tv.tv_sec = 0;
641 tv.tv_usec = 0;
642 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
643 {
644 fprintf(stderr, "select() failed: %m\n");
646 }
647 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
648 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
649 }
650
651 /* tell server to flush its output buffer */
652 if (PQsendFlushRequest(conn) != 1)
653 pg_fatal("failed to send flush request");
654 PQflush(conn);
655
656 /* Now read all results */
657 for (;;)
658 {
659 PGresult *res;
660
662
663 /* NULL results are only expected after TUPLES_OK */
664 if (res == NULL)
665 pg_fatal("got unexpected NULL result after %d results", results);
666
667 /* We expect exactly one TUPLES_OK result for each query we sent */
669 {
670 PGresult *res2;
671
672 /* and one NULL result should follow each */
673 res2 = PQgetResult(conn);
674 if (res2 != NULL)
675 pg_fatal("expected NULL, got %s",
677 PQclear(res);
678 results++;
679
680 /* if we're done, we're done */
681 if (results == numqueries)
682 break;
683
684 continue;
685 }
686
687 /* anything else is unexpected */
688 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
689 }
690
691 fprintf(stderr, "ok\n");
692}
693
694/*
695 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
696 * still have to get results for each pipeline item, but the item will just be
697 * a PGRES_PIPELINE_ABORTED code.
698 *
699 * This intentionally doesn't use a transaction to wrap the pipeline. You should
700 * usually use an xact, but in this case we want to observe the effects of each
701 * statement.
702 */
703static void
705{
706 PGresult *res = NULL;
707 const char *dummy_params[1] = {"1"};
708 Oid dummy_param_oids[1] = {INT4OID};
709 int i;
710 int gotrows;
711 bool goterror;
712
713 fprintf(stderr, "aborted pipeline... ");
714
717 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
718
721 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
722
723 /*
724 * Queue up a couple of small pipelines and process each without returning
725 * to command mode first. Make sure the second operation in the first
726 * pipeline ERRORs.
727 */
728 if (PQenterPipelineMode(conn) != 1)
729 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
730
731 dummy_params[0] = "1";
732 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
733 dummy_params, NULL, NULL, 0) != 1)
734 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
735
736 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
737 1, dummy_param_oids, dummy_params,
738 NULL, NULL, 0) != 1)
739 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
740
741 dummy_params[0] = "2";
742 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
743 dummy_params, NULL, NULL, 0) != 1)
744 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
745
746 if (PQpipelineSync(conn) != 1)
747 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
748
749 dummy_params[0] = "3";
750 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
751 dummy_params, NULL, NULL, 0) != 1)
752 pg_fatal("dispatching second-pipeline insert failed: %s",
754
755 if (PQpipelineSync(conn) != 1)
756 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
757
758 /*
759 * OK, start processing the pipeline results.
760 *
761 * We should get a command-ok for the first query, then a fatal error and
762 * a pipeline aborted message for the second insert, a pipeline-end, then
763 * a command-ok and a pipeline-ok for the second pipeline operation.
764 */
766 if (res == NULL)
767 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
769 pg_fatal("Unexpected result status %s: %s",
772 PQclear(res);
773
774 /* NULL result to signal end-of-results for this command */
775 if ((res = PQgetResult(conn)) != NULL)
776 pg_fatal("Expected null result, got %s",
778
779 /* Second query caused error, so we expect an error next */
781 if (res == NULL)
782 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
784 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
786 PQclear(res);
787
788 /* NULL result to signal end-of-results for this command */
789 if ((res = PQgetResult(conn)) != NULL)
790 pg_fatal("Expected null result, got %s",
792
793 /*
794 * pipeline should now be aborted.
795 *
796 * Note that we could still queue more queries at this point if we wanted;
797 * they'd get added to a new third pipeline since we've already sent a
798 * second. The aborted flag relates only to the pipeline being received.
799 */
801 pg_fatal("pipeline should be flagged as aborted but isn't");
802
803 /* third query in pipeline, the second insert */
805 if (res == NULL)
806 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
808 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
810 PQclear(res);
811
812 /* NULL result to signal end-of-results for this command */
813 if ((res = PQgetResult(conn)) != NULL)
814 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
815
817 pg_fatal("pipeline should be flagged as aborted but isn't");
818
819 /* Ensure we're still in pipeline */
821 pg_fatal("Fell out of pipeline mode somehow");
822
823 /*
824 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
825 *
826 * (This is so clients know to start processing results normally again and
827 * can tell the difference between skipped commands and the sync.)
828 */
830 if (res == NULL)
831 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
833 pg_fatal("Unexpected result code from first pipeline sync\n"
834 "Expected PGRES_PIPELINE_SYNC, got %s",
836 PQclear(res);
837
839 pg_fatal("sync should've cleared the aborted flag but didn't");
840
841 /* We're still in pipeline mode... */
843 pg_fatal("Fell out of pipeline mode somehow");
844
845 /* the insert from the second pipeline */
847 if (res == NULL)
848 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
850 pg_fatal("Unexpected result code %s from first item in second pipeline",
852 PQclear(res);
853
854 /* Read the NULL result at the end of the command */
855 if ((res = PQgetResult(conn)) != NULL)
856 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
857
858 /* the second pipeline sync */
859 if ((res = PQgetResult(conn)) == NULL)
860 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
862 pg_fatal("Unexpected result code %s from second pipeline sync",
864 PQclear(res);
865
866 if ((res = PQgetResult(conn)) != NULL)
867 pg_fatal("Expected null result, got %s: %s",
870
871 /* Try to send two queries in one command */
872 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
873 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
874 if (PQpipelineSync(conn) != 1)
875 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
876 goterror = false;
877 while ((res = PQgetResult(conn)) != NULL)
878 {
879 switch (PQresultStatus(res))
880 {
882 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
883 pg_fatal("expected error about multiple commands, got %s",
885 printf("got expected %s", PQerrorMessage(conn));
886 goterror = true;
887 break;
888 default:
889 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
890 break;
891 }
892 }
893 if (!goterror)
894 pg_fatal("did not get cannot-insert-multiple-commands error");
896 if (res == NULL)
897 pg_fatal("got NULL result");
899 pg_fatal("Unexpected result code %s from pipeline sync",
901 fprintf(stderr, "ok\n");
902
903 /* Test single-row mode with an error partways */
904 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
905 0, NULL, NULL, NULL, NULL, 0) != 1)
906 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
907 if (PQpipelineSync(conn) != 1)
908 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
910 goterror = false;
911 gotrows = 0;
912 while ((res = PQgetResult(conn)) != NULL)
913 {
914 switch (PQresultStatus(res))
915 {
917 printf("got row: %s\n", PQgetvalue(res, 0, 0));
918 gotrows++;
919 break;
921 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
922 pg_fatal("expected division-by-zero, got: %s (%s)",
925 printf("got expected division-by-zero\n");
926 goterror = true;
927 break;
928 default:
929 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
930 }
931 PQclear(res);
932 }
933 if (!goterror)
934 pg_fatal("did not get division-by-zero error");
935 if (gotrows != 3)
936 pg_fatal("did not get three rows");
937 /* the third pipeline sync */
938 if ((res = PQgetResult(conn)) == NULL)
939 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
941 pg_fatal("Unexpected result code %s from third pipeline sync",
943 PQclear(res);
944
945 /* We're still in pipeline mode... */
947 pg_fatal("Fell out of pipeline mode somehow");
948
949 /* until we end it, which we can safely do now */
950 if (PQexitPipelineMode(conn) != 1)
951 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
953
955 pg_fatal("exiting pipeline mode didn't seem to work");
956
957 /*-
958 * Since we fired the pipelines off without a surrounding xact, the results
959 * should be:
960 *
961 * - Implicit xact started by server around 1st pipeline
962 * - First insert applied
963 * - Second statement aborted xact
964 * - Third insert skipped
965 * - Sync rolled back first implicit xact
966 * - Implicit xact created by server around 2nd pipeline
967 * - insert applied from 2nd pipeline
968 * - Sync commits 2nd xact
969 *
970 * So we should only have the value 3 that we inserted.
971 */
972 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
973
975 pg_fatal("Expected tuples, got %s: %s",
977 if (PQntuples(res) != 1)
978 pg_fatal("expected 1 result, got %d", PQntuples(res));
979 for (i = 0; i < PQntuples(res); i++)
980 {
981 const char *val = PQgetvalue(res, i, 0);
982
983 if (strcmp(val, "3") != 0)
984 pg_fatal("expected only insert with value 3, got %s", val);
985 }
986
987 PQclear(res);
988
989 fprintf(stderr, "ok\n");
990}
991
992/* State machine enum for test_pipelined_insert */
994{
1003};
1004
1005static void
1007{
1008 Oid insert_param_oids[2] = {INT4OID, INT8OID};
1009 const char *insert_params[2];
1010 char insert_param_0[MAXINTLEN];
1011 char insert_param_1[MAXINT8LEN];
1012 enum PipelineInsertStep send_step = BI_BEGIN_TX,
1013 recv_step = BI_BEGIN_TX;
1014 int rows_to_send,
1015 rows_to_receive;
1016
1017 insert_params[0] = insert_param_0;
1018 insert_params[1] = insert_param_1;
1019
1020 rows_to_send = rows_to_receive = n_rows;
1021
1022 /*
1023 * Do a pipelined insert into a table created at the start of the pipeline
1024 */
1025 if (PQenterPipelineMode(conn) != 1)
1026 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1027
1028 while (send_step != BI_PREPARE)
1029 {
1030 const char *sql;
1031
1032 switch (send_step)
1033 {
1034 case BI_BEGIN_TX:
1035 sql = "BEGIN TRANSACTION";
1036 send_step = BI_DROP_TABLE;
1037 break;
1038
1039 case BI_DROP_TABLE:
1040 sql = drop_table_sql;
1041 send_step = BI_CREATE_TABLE;
1042 break;
1043
1044 case BI_CREATE_TABLE:
1045 sql = create_table_sql;
1046 send_step = BI_PREPARE;
1047 break;
1048
1049 default:
1050 pg_fatal("invalid state");
1051 sql = NULL; /* keep compiler quiet */
1052 }
1053
1054 pg_debug("sending: %s\n", sql);
1055 if (PQsendQueryParams(conn, sql,
1056 0, NULL, NULL, NULL, NULL, 0) != 1)
1057 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1058 }
1059
1060 Assert(send_step == BI_PREPARE);
1061 pg_debug("sending: %s\n", insert_sql2);
1062 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1063 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1064 send_step = BI_INSERT_ROWS;
1065
1066 /*
1067 * Now we start inserting. We'll be sending enough data that we could fill
1068 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1069 * mode and consume input while we send more output. As results of each
1070 * query are processed we should pop them to allow processing of the next
1071 * query. There's no need to finish the pipeline before processing
1072 * results.
1073 */
1074 if (PQsetnonblocking(conn, 1) != 0)
1075 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1076
1077 while (recv_step != BI_DONE)
1078 {
1079 int sock;
1080 fd_set input_mask;
1081 fd_set output_mask;
1082
1083 sock = PQsocket(conn);
1084
1085 if (sock < 0)
1086 break; /* shouldn't happen */
1087
1088 FD_ZERO(&input_mask);
1089 FD_SET(sock, &input_mask);
1090 FD_ZERO(&output_mask);
1091 FD_SET(sock, &output_mask);
1092
1093 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1094 {
1095 fprintf(stderr, "select() failed: %m\n");
1097 }
1098
1099 /*
1100 * Process any results, so we keep the server's output buffer free
1101 * flowing and it can continue to process input
1102 */
1103 if (FD_ISSET(sock, &input_mask))
1104 {
1106
1107 /* Read until we'd block if we tried to read */
1108 while (!PQisBusy(conn) && recv_step < BI_DONE)
1109 {
1110 PGresult *res;
1111 const char *cmdtag = "";
1112 const char *description = "";
1113 int status;
1114
1115 /*
1116 * Read next result. If no more results from this query,
1117 * advance to the next query
1118 */
1119 res = PQgetResult(conn);
1120 if (res == NULL)
1121 continue;
1122
1123 status = PGRES_COMMAND_OK;
1124 switch (recv_step)
1125 {
1126 case BI_BEGIN_TX:
1127 cmdtag = "BEGIN";
1128 recv_step++;
1129 break;
1130 case BI_DROP_TABLE:
1131 cmdtag = "DROP TABLE";
1132 recv_step++;
1133 break;
1134 case BI_CREATE_TABLE:
1135 cmdtag = "CREATE TABLE";
1136 recv_step++;
1137 break;
1138 case BI_PREPARE:
1139 cmdtag = "";
1140 description = "PREPARE";
1141 recv_step++;
1142 break;
1143 case BI_INSERT_ROWS:
1144 cmdtag = "INSERT";
1145 rows_to_receive--;
1146 if (rows_to_receive == 0)
1147 recv_step++;
1148 break;
1149 case BI_COMMIT_TX:
1150 cmdtag = "COMMIT";
1151 recv_step++;
1152 break;
1153 case BI_SYNC:
1154 cmdtag = "";
1155 description = "SYNC";
1156 status = PGRES_PIPELINE_SYNC;
1157 recv_step++;
1158 break;
1159 case BI_DONE:
1160 /* unreachable */
1161 pg_fatal("unreachable state");
1162 }
1163
1164 if (PQresultStatus(res) != status)
1165 pg_fatal("%s reported status %s, expected %s\n"
1166 "Error message: \"%s\"",
1168 PQresStatus(status), PQerrorMessage(conn));
1169
1170 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1171 pg_fatal("%s expected command tag '%s', got '%s'",
1172 description, cmdtag, PQcmdStatus(res));
1173
1174 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1175
1176 PQclear(res);
1177 }
1178 }
1179
1180 /* Write more rows and/or the end pipeline message, if needed */
1181 if (FD_ISSET(sock, &output_mask))
1182 {
1183 PQflush(conn);
1184
1185 if (send_step == BI_INSERT_ROWS)
1186 {
1187 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1188 /* use up some buffer space with a wide value */
1189 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1190
1191 if (PQsendQueryPrepared(conn, "my_insert",
1192 2, insert_params, NULL, NULL, 0) == 1)
1193 {
1194 pg_debug("sent row %d\n", rows_to_send);
1195
1196 rows_to_send--;
1197 if (rows_to_send == 0)
1198 send_step++;
1199 }
1200 else
1201 {
1202 /*
1203 * in nonblocking mode, so it's OK for an insert to fail
1204 * to send
1205 */
1206 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1207 rows_to_send, PQerrorMessage(conn));
1208 }
1209 }
1210 else if (send_step == BI_COMMIT_TX)
1211 {
1212 if (PQsendQueryParams(conn, "COMMIT",
1213 0, NULL, NULL, NULL, NULL, 0) == 1)
1214 {
1215 pg_debug("sent COMMIT\n");
1216 send_step++;
1217 }
1218 else
1219 {
1220 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1222 }
1223 }
1224 else if (send_step == BI_SYNC)
1225 {
1226 if (PQpipelineSync(conn) == 1)
1227 {
1228 fprintf(stdout, "pipeline sync sent\n");
1229 send_step++;
1230 }
1231 else
1232 {
1233 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1235 }
1236 }
1237 }
1238 }
1239
1240 /* We've got the sync message and the pipeline should be done */
1241 if (PQexitPipelineMode(conn) != 1)
1242 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1244
1245 if (PQsetnonblocking(conn, 0) != 0)
1246 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1247
1248 fprintf(stderr, "ok\n");
1249}
1250
1251static void
1253{
1254 PGresult *res = NULL;
1255 Oid param_oids[1] = {INT4OID};
1256 Oid expected_oids[4];
1257 Oid typ;
1258
1259 fprintf(stderr, "prepared... ");
1260
1261 if (PQenterPipelineMode(conn) != 1)
1262 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1263 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1264 "interval '1 sec'",
1265 1, param_oids) != 1)
1266 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1267 expected_oids[0] = INT4OID;
1268 expected_oids[1] = TEXTOID;
1269 expected_oids[2] = NUMERICOID;
1270 expected_oids[3] = INTERVALOID;
1271 if (PQsendDescribePrepared(conn, "select_one") != 1)
1272 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1273 if (PQpipelineSync(conn) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275
1276 res = PQgetResult(conn);
1277 if (res == NULL)
1278 pg_fatal("PQgetResult returned null");
1280 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1281 PQclear(res);
1282 res = PQgetResult(conn);
1283 if (res != NULL)
1284 pg_fatal("expected NULL result");
1285
1286 res = PQgetResult(conn);
1287 if (res == NULL)
1288 pg_fatal("PQgetResult returned NULL");
1290 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1291 if (PQnfields(res) != lengthof(expected_oids))
1292 pg_fatal("expected %zu columns, got %d",
1293 lengthof(expected_oids), PQnfields(res));
1294 for (int i = 0; i < PQnfields(res); i++)
1295 {
1296 typ = PQftype(res, i);
1297 if (typ != expected_oids[i])
1298 pg_fatal("field %d: expected type %u, got %u",
1299 i, expected_oids[i], typ);
1300 }
1301 PQclear(res);
1302 res = PQgetResult(conn);
1303 if (res != NULL)
1304 pg_fatal("expected NULL result");
1305
1306 res = PQgetResult(conn);
1308 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1309
1310 fprintf(stderr, "closing statement..");
1311 if (PQsendClosePrepared(conn, "select_one") != 1)
1312 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1313 if (PQpipelineSync(conn) != 1)
1314 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1315
1316 res = PQgetResult(conn);
1317 if (res == NULL)
1318 pg_fatal("expected non-NULL result");
1320 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1321 PQclear(res);
1322 res = PQgetResult(conn);
1323 if (res != NULL)
1324 pg_fatal("expected NULL result");
1325 res = PQgetResult(conn);
1327 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1328
1329 if (PQexitPipelineMode(conn) != 1)
1330 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1331
1332 /* Now that it's closed we should get an error when describing */
1333 res = PQdescribePrepared(conn, "select_one");
1335 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1336
1337 /*
1338 * Also test the blocking close, this should not fail since closing a
1339 * non-existent prepared statement is a no-op
1340 */
1341 res = PQclosePrepared(conn, "select_one");
1343 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1344
1345 fprintf(stderr, "creating portal... ");
1346 PQexec(conn, "BEGIN");
1347 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1349 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1350 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1351 if (PQpipelineSync(conn) != 1)
1352 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353 res = PQgetResult(conn);
1354 if (res == NULL)
1355 pg_fatal("PQgetResult returned null");
1357 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1358
1359 typ = PQftype(res, 0);
1360 if (typ != INT4OID)
1361 pg_fatal("portal: expected type %u, got %u",
1362 INT4OID, typ);
1363 PQclear(res);
1364 res = PQgetResult(conn);
1365 if (res != NULL)
1366 pg_fatal("expected NULL result");
1367 res = PQgetResult(conn);
1369 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1370
1371 fprintf(stderr, "closing portal... ");
1372 if (PQsendClosePortal(conn, "cursor_one") != 1)
1373 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1374 if (PQpipelineSync(conn) != 1)
1375 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1376
1377 res = PQgetResult(conn);
1378 if (res == NULL)
1379 pg_fatal("expected non-NULL result");
1381 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1382 PQclear(res);
1383 res = PQgetResult(conn);
1384 if (res != NULL)
1385 pg_fatal("expected NULL result");
1386 res = PQgetResult(conn);
1388 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1389
1390 if (PQexitPipelineMode(conn) != 1)
1391 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1392
1393 /* Now that it's closed we should get an error when describing */
1394 res = PQdescribePortal(conn, "cursor_one");
1396 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1397
1398 /*
1399 * Also test the blocking close, this should not fail since closing a
1400 * non-existent portal is a no-op
1401 */
1402 res = PQclosePortal(conn, "cursor_one");
1404 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1405
1406 fprintf(stderr, "ok\n");
1407}
1408
1409/* Notice processor: print notices, and count how many we got */
1410static void
1411notice_processor(void *arg, const char *message)
1412{
1413 int *n_notices = (int *) arg;
1414
1415 (*n_notices)++;
1416 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1417}
1418
1419/* Verify behavior in "idle" state */
1420static void
1422{
1423 PGresult *res;
1424 int n_notices = 0;
1425
1426 fprintf(stderr, "\npipeline idle...\n");
1427
1429
1430 /* Try to exit pipeline mode in pipeline-idle state */
1431 if (PQenterPipelineMode(conn) != 1)
1432 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1433 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1434 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1436 res = PQgetResult(conn);
1437 if (res == NULL)
1438 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441 pg_fatal("unexpected result code %s from first pipeline item",
1443 PQclear(res);
1444 res = PQgetResult(conn);
1445 if (res != NULL)
1446 pg_fatal("did not receive terminating NULL");
1447 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1448 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1449 if (PQexitPipelineMode(conn) == 1)
1450 pg_fatal("exiting pipeline succeeded when it shouldn't");
1451 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1452 strlen("cannot exit pipeline mode")) != 0)
1453 pg_fatal("did not get expected error; got: %s",
1456 res = PQgetResult(conn);
1458 pg_fatal("unexpected result code %s from second pipeline item",
1460 PQclear(res);
1461 res = PQgetResult(conn);
1462 if (res != NULL)
1463 pg_fatal("did not receive terminating NULL");
1464 if (PQexitPipelineMode(conn) != 1)
1465 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1466
1467 if (n_notices > 0)
1468 pg_fatal("got %d notice(s)", n_notices);
1469 fprintf(stderr, "ok - 1\n");
1470
1471 /* Have a WARNING in the middle of a resultset */
1472 if (PQenterPipelineMode(conn) != 1)
1473 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1474 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1475 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1477 res = PQgetResult(conn);
1478 if (res == NULL)
1479 pg_fatal("unexpected NULL result received");
1481 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1482 if (PQexitPipelineMode(conn) != 1)
1483 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1484 fprintf(stderr, "ok - 2\n");
1485}
1486
1487static void
1489{
1490 PGresult *res = NULL;
1491 const char *dummy_params[1] = {"1"};
1492 Oid dummy_param_oids[1] = {INT4OID};
1493
1494 fprintf(stderr, "simple pipeline... ");
1495
1496 /*
1497 * Enter pipeline mode and dispatch a set of operations, which we'll then
1498 * process the results of as they come in.
1499 *
1500 * For a simple case we should be able to do this without interim
1501 * processing of results since our output buffer will give us enough slush
1502 * to work with and we won't block on sending. So blocking mode is fine.
1503 */
1504 if (PQisnonblocking(conn))
1505 pg_fatal("Expected blocking connection mode");
1506
1507 if (PQenterPipelineMode(conn) != 1)
1508 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1509
1510 if (PQsendQueryParams(conn, "SELECT $1",
1511 1, dummy_param_oids, dummy_params,
1512 NULL, NULL, 0) != 1)
1513 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1514
1515 if (PQexitPipelineMode(conn) != 0)
1516 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1517
1518 if (PQpipelineSync(conn) != 1)
1519 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1520
1521 res = PQgetResult(conn);
1522 if (res == NULL)
1523 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1525
1527 pg_fatal("Unexpected result code %s from first pipeline item",
1529
1530 PQclear(res);
1531 res = NULL;
1532
1533 if (PQgetResult(conn) != NULL)
1534 pg_fatal("PQgetResult returned something extra after first query result.");
1535
1536 /*
1537 * Even though we've processed the result there's still a sync to come and
1538 * we can't exit pipeline mode yet
1539 */
1540 if (PQexitPipelineMode(conn) != 0)
1541 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1542
1543 res = PQgetResult(conn);
1544 if (res == NULL)
1545 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1547
1549 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1551
1552 PQclear(res);
1553 res = NULL;
1554
1555 if (PQgetResult(conn) != NULL)
1556 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1558
1559 /* We're still in pipeline mode... */
1561 pg_fatal("Fell out of pipeline mode somehow");
1562
1563 /* ... until we end it, which we can safely do now */
1564 if (PQexitPipelineMode(conn) != 1)
1565 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1567
1569 pg_fatal("Exiting pipeline mode didn't seem to work");
1570
1571 fprintf(stderr, "ok\n");
1572}
1573
1574static void
1576{
1577 PGresult *res;
1578 int i;
1579 bool pipeline_ended = false;
1580
1581 if (PQenterPipelineMode(conn) != 1)
1582 pg_fatal("failed to enter pipeline mode: %s",
1584
1585 /* One series of three commands, using single-row mode for the first two. */
1586 for (i = 0; i < 3; i++)
1587 {
1588 char *param[1];
1589
1590 param[0] = psprintf("%d", 44 + i);
1591
1593 "SELECT generate_series(42, $1)",
1594 1,
1595 NULL,
1596 (const char **) param,
1597 NULL,
1598 NULL,
1599 0) != 1)
1600 pg_fatal("failed to send query: %s",
1602 pfree(param[0]);
1603 }
1604 if (PQpipelineSync(conn) != 1)
1605 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1606
1607 for (i = 0; !pipeline_ended; i++)
1608 {
1609 bool first = true;
1610 bool saw_ending_tuplesok;
1611 bool isSingleTuple = false;
1612
1613 /* Set single row mode for only first 2 SELECT queries */
1614 if (i < 2)
1615 {
1616 if (PQsetSingleRowMode(conn) != 1)
1617 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1618 }
1619
1620 /* Consume rows for this query */
1621 saw_ending_tuplesok = false;
1622 while ((res = PQgetResult(conn)) != NULL)
1623 {
1625
1626 if (est == PGRES_PIPELINE_SYNC)
1627 {
1628 fprintf(stderr, "end of pipeline reached\n");
1629 pipeline_ended = true;
1630 PQclear(res);
1631 if (i != 3)
1632 pg_fatal("Expected three results, got %d", i);
1633 break;
1634 }
1635
1636 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1637 if (first)
1638 {
1639 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1640 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1641 i, PQresStatus(est));
1642 if (i >= 2 && est != PGRES_TUPLES_OK)
1643 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1644 i, PQresStatus(est));
1645 first = false;
1646 }
1647
1648 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1649 switch (est)
1650 {
1651 case PGRES_TUPLES_OK:
1652 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1653 saw_ending_tuplesok = true;
1654 if (isSingleTuple)
1655 {
1656 if (PQntuples(res) == 0)
1657 fprintf(stderr, "all tuples received in query %d\n", i);
1658 else
1659 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1660 }
1661 break;
1662
1663 case PGRES_SINGLE_TUPLE:
1664 isSingleTuple = true;
1665 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1666 break;
1667
1668 default:
1669 pg_fatal("unexpected");
1670 }
1671 PQclear(res);
1672 }
1673 if (!pipeline_ended && !saw_ending_tuplesok)
1674 pg_fatal("didn't get expected terminating TUPLES_OK");
1675 }
1676
1677 /*
1678 * Now issue one command, get its results in with single-row mode, then
1679 * issue another command, and get its results in normal mode; make sure
1680 * the single-row mode flag is reset as expected.
1681 */
1682 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1683 0, NULL, NULL, NULL, NULL, 0) != 1)
1684 pg_fatal("failed to send query: %s",
1686 if (PQsendFlushRequest(conn) != 1)
1687 pg_fatal("failed to send flush request");
1688 if (PQsetSingleRowMode(conn) != 1)
1689 pg_fatal("PQsetSingleRowMode() failed");
1690 res = PQgetResult(conn);
1691 if (res == NULL)
1692 pg_fatal("unexpected NULL");
1694 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1696 res = PQgetResult(conn);
1697 if (res == NULL)
1698 pg_fatal("unexpected NULL");
1700 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1702 if (PQgetResult(conn) != NULL)
1703 pg_fatal("expected NULL result");
1704
1705 if (PQsendQueryParams(conn, "SELECT 1",
1706 0, NULL, NULL, NULL, NULL, 0) != 1)
1707 pg_fatal("failed to send query: %s",
1709 if (PQsendFlushRequest(conn) != 1)
1710 pg_fatal("failed to send flush request");
1711 res = PQgetResult(conn);
1712 if (res == NULL)
1713 pg_fatal("unexpected NULL");
1715 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1717 if (PQgetResult(conn) != NULL)
1718 pg_fatal("expected NULL result");
1719
1720 /*
1721 * Try chunked mode as well; make sure that it correctly delivers a
1722 * partial final chunk.
1723 */
1724 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1725 0, NULL, NULL, NULL, NULL, 0) != 1)
1726 pg_fatal("failed to send query: %s",
1728 if (PQsendFlushRequest(conn) != 1)
1729 pg_fatal("failed to send flush request");
1730 if (PQsetChunkedRowsMode(conn, 3) != 1)
1731 pg_fatal("PQsetChunkedRowsMode() failed");
1732 res = PQgetResult(conn);
1733 if (res == NULL)
1734 pg_fatal("unexpected NULL");
1736 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1739 if (PQntuples(res) != 3)
1740 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1741 res = PQgetResult(conn);
1742 if (res == NULL)
1743 pg_fatal("unexpected NULL");
1745 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1747 if (PQntuples(res) != 2)
1748 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1749 res = PQgetResult(conn);
1750 if (res == NULL)
1751 pg_fatal("unexpected NULL");
1753 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1755 if (PQntuples(res) != 0)
1756 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1757 if (PQgetResult(conn) != NULL)
1758 pg_fatal("expected NULL result");
1759
1760 if (PQexitPipelineMode(conn) != 1)
1761 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1762
1763 fprintf(stderr, "ok\n");
1764}
1765
1766/*
1767 * Simple test to verify that a pipeline is discarded as a whole when there's
1768 * an error, ignoring transaction commands.
1769 */
1770static void
1772{
1773 PGresult *res;
1774 bool expect_null;
1775 int num_syncs = 0;
1776
1777 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1778 "CREATE TABLE pq_pipeline_tst (id int)");
1780 pg_fatal("failed to create test table: %s",
1782 PQclear(res);
1783
1784 if (PQenterPipelineMode(conn) != 1)
1785 pg_fatal("failed to enter pipeline mode: %s",
1787 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1788 pg_fatal("could not send prepare on pipeline: %s",
1790
1792 "BEGIN",
1793 0, NULL, NULL, NULL, NULL, 0) != 1)
1794 pg_fatal("failed to send query: %s",
1797 "SELECT 0/0",
1798 0, NULL, NULL, NULL, NULL, 0) != 1)
1799 pg_fatal("failed to send query: %s",
1801
1802 /*
1803 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1804 * get out of the pipeline-aborted state first.
1805 */
1806 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1807 pg_fatal("failed to execute prepared: %s",
1809
1810 /* This insert fails because we're in pipeline-aborted state */
1812 "INSERT INTO pq_pipeline_tst VALUES (1)",
1813 0, NULL, NULL, NULL, NULL, 0) != 1)
1814 pg_fatal("failed to send query: %s",
1816 if (PQpipelineSync(conn) != 1)
1817 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1818 num_syncs++;
1819
1820 /*
1821 * This insert fails even though the pipeline got a SYNC, because we're in
1822 * an aborted transaction
1823 */
1825 "INSERT INTO pq_pipeline_tst VALUES (2)",
1826 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 pg_fatal("failed to send query: %s",
1829 if (PQpipelineSync(conn) != 1)
1830 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1831 num_syncs++;
1832
1833 /*
1834 * Send ROLLBACK using prepared stmt. This one works because we just did
1835 * PQpipelineSync above.
1836 */
1837 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1838 pg_fatal("failed to execute prepared: %s",
1840
1841 /*
1842 * Now that we're out of a transaction and in pipeline-good mode, this
1843 * insert works
1844 */
1846 "INSERT INTO pq_pipeline_tst VALUES (3)",
1847 0, NULL, NULL, NULL, NULL, 0) != 1)
1848 pg_fatal("failed to send query: %s",
1850 /* Send two syncs now -- match up to SYNC messages below */
1851 if (PQpipelineSync(conn) != 1)
1852 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1853 num_syncs++;
1854 if (PQpipelineSync(conn) != 1)
1855 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1856 num_syncs++;
1857
1858 expect_null = false;
1859 for (int i = 0;; i++)
1860 {
1861 ExecStatusType restype;
1862
1863 res = PQgetResult(conn);
1864 if (res == NULL)
1865 {
1866 printf("%d: got NULL result\n", i);
1867 if (!expect_null)
1868 pg_fatal("did not expect NULL here");
1869 expect_null = false;
1870 continue;
1871 }
1872 restype = PQresultStatus(res);
1873 printf("%d: got status %s", i, PQresStatus(restype));
1874 if (expect_null)
1875 pg_fatal("expected NULL");
1876 if (restype == PGRES_FATAL_ERROR)
1877 printf("; error: %s", PQerrorMessage(conn));
1878 else if (restype == PGRES_PIPELINE_ABORTED)
1879 {
1880 printf(": command didn't run because pipeline aborted\n");
1881 }
1882 else
1883 printf("\n");
1884 PQclear(res);
1885
1886 if (restype == PGRES_PIPELINE_SYNC)
1887 num_syncs--;
1888 else
1889 expect_null = true;
1890 if (num_syncs <= 0)
1891 break;
1892 }
1893 if (PQgetResult(conn) != NULL)
1894 pg_fatal("returned something extra after all the syncs: %s",
1896
1897 if (PQexitPipelineMode(conn) != 1)
1898 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1899
1900 /* We expect to find one tuple containing the value "3" */
1901 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1903 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1904 if (PQntuples(res) != 1)
1905 pg_fatal("did not get 1 tuple");
1906 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1907 pg_fatal("did not get expected tuple");
1908 PQclear(res);
1909
1910 fprintf(stderr, "ok\n");
1911}
1912
1913/*
1914 * In this test mode we send a stream of queries, with one in the middle
1915 * causing an error. Verify that we can still send some more after the
1916 * error and have libpq work properly.
1917 */
1918static void
1920{
1921 int sock = PQsocket(conn);
1922 PGresult *res;
1923 Oid paramTypes[2] = {INT8OID, INT8OID};
1924 const char *paramValues[2];
1925 char paramValue0[MAXINT8LEN];
1926 char paramValue1[MAXINT8LEN];
1927 int ctr = 0;
1928 int numsent = 0;
1929 int results = 0;
1930 bool read_done = false;
1931 bool write_done = false;
1932 bool error_sent = false;
1933 bool got_error = false;
1934 int switched = 0;
1935 int socketful = 0;
1936 fd_set in_fds;
1937 fd_set out_fds;
1938
1939 fprintf(stderr, "uniqviol ...");
1940
1942
1943 paramValues[0] = paramValue0;
1944 paramValues[1] = paramValue1;
1945 sprintf(paramValue1, "42");
1946
1947 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1948 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1950 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1951
1952 res = PQexec(conn, "begin");
1954 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1955
1956 res = PQprepare(conn, "insertion",
1957 "insert into ppln_uniqviol values ($1, $2) returning id",
1958 2, paramTypes);
1959 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1960 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1961
1962 if (PQenterPipelineMode(conn) != 1)
1963 pg_fatal("failed to enter pipeline mode");
1964
1965 while (!read_done)
1966 {
1967 /*
1968 * Avoid deadlocks by reading everything the server has sent before
1969 * sending anything. (Special precaution is needed here to process
1970 * PQisBusy before testing the socket for read-readiness, because the
1971 * socket does not turn read-ready after "sending" queries in aborted
1972 * pipeline mode.)
1973 */
1974 while (PQisBusy(conn) == 0)
1975 {
1976 bool new_error;
1977
1978 if (results >= numsent)
1979 {
1980 if (write_done)
1981 read_done = true;
1982 break;
1983 }
1984
1985 res = PQgetResult(conn);
1986 new_error = process_result(conn, res, results, numsent);
1987 if (new_error && got_error)
1988 pg_fatal("got two errors");
1989 got_error |= new_error;
1990 if (results++ >= numsent - 1)
1991 {
1992 if (write_done)
1993 read_done = true;
1994 break;
1995 }
1996 }
1997
1998 if (read_done)
1999 break;
2000
2001 FD_ZERO(&out_fds);
2002 FD_SET(sock, &out_fds);
2003
2004 FD_ZERO(&in_fds);
2005 FD_SET(sock, &in_fds);
2006
2007 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2008 {
2009 if (errno == EINTR)
2010 continue;
2011 pg_fatal("select() failed: %m");
2012 }
2013
2014 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2015 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2016
2017 /*
2018 * If the socket is writable and we haven't finished sending queries,
2019 * send some.
2020 */
2021 if (!write_done && FD_ISSET(sock, &out_fds))
2022 {
2023 for (;;)
2024 {
2025 int flush;
2026
2027 /*
2028 * provoke uniqueness violation exactly once after having
2029 * switched to read mode.
2030 */
2031 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2032 {
2033 sprintf(paramValue0, "%d", numsent / 2);
2034 fprintf(stderr, "E");
2035 error_sent = true;
2036 }
2037 else
2038 {
2039 fprintf(stderr, ".");
2040 sprintf(paramValue0, "%d", ctr++);
2041 }
2042
2043 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2044 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2045 numsent++;
2046
2047 /* Are we done writing? */
2048 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2049 {
2050 if (PQsendFlushRequest(conn) != 1)
2051 pg_fatal("failed to send flush request");
2052 write_done = true;
2053 fprintf(stderr, "\ndone writing\n");
2054 PQflush(conn);
2055 break;
2056 }
2057
2058 /* is the outgoing socket full? */
2059 flush = PQflush(conn);
2060 if (flush == -1)
2061 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2062 if (flush == 1)
2063 {
2064 if (socketful == 0)
2065 socketful = numsent;
2066 fprintf(stderr, "\nswitch to reading\n");
2067 switched++;
2068 break;
2069 }
2070 }
2071 }
2072 }
2073
2074 if (!got_error)
2075 pg_fatal("did not get expected error");
2076
2077 fprintf(stderr, "ok\n");
2078}
2079
2080/*
2081 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2082 * the expected NULL that should follow it.
2083 *
2084 * Returns true if we read a fatal error message, otherwise false.
2085 */
2086static bool
2087process_result(PGconn *conn, PGresult *res, int results, int numsent)
2088{
2089 PGresult *res2;
2090 bool got_error = false;
2091
2092 if (res == NULL)
2093 pg_fatal("got unexpected NULL");
2094
2095 switch (PQresultStatus(res))
2096 {
2097 case PGRES_FATAL_ERROR:
2098 got_error = true;
2099 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2100 PQclear(res);
2101
2102 res2 = PQgetResult(conn);
2103 if (res2 != NULL)
2104 pg_fatal("expected NULL, got %s",
2106 break;
2107
2108 case PGRES_TUPLES_OK:
2109 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2110 PQclear(res);
2111
2112 res2 = PQgetResult(conn);
2113 if (res2 != NULL)
2114 pg_fatal("expected NULL, got %s",
2116 break;
2117
2119 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2120 res2 = PQgetResult(conn);
2121 if (res2 != NULL)
2122 pg_fatal("expected NULL, got %s",
2124 break;
2125
2126 default:
2127 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2128 }
2129
2130 return got_error;
2131}
2132
2133
2134static void
2135usage(const char *progname)
2136{
2137 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2138 fprintf(stderr, "Usage:\n");
2139 fprintf(stderr, " %s [OPTION] tests\n", progname);
2140 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2141 fprintf(stderr, "\nOptions:\n");
2142 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2143 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2144}
2145
2146static void
2148{
2149 printf("cancel\n");
2150 printf("disallowed_in_pipeline\n");
2151 printf("multi_pipelines\n");
2152 printf("nosync\n");
2153 printf("pipeline_abort\n");
2154 printf("pipeline_idle\n");
2155 printf("pipelined_insert\n");
2156 printf("prepared\n");
2157 printf("simple_pipeline\n");
2158 printf("singlerow\n");
2159 printf("transaction\n");
2160 printf("uniqviol\n");
2161}
2162
2163int
2164main(int argc, char **argv)
2165{
2166 const char *conninfo = "";
2167 PGconn *conn;
2168 FILE *trace;
2169 char *testname;
2170 int numrows = 10000;
2171 PGresult *res;
2172 int c;
2173
2174 while ((c = getopt(argc, argv, "r:t:")) != -1)
2175 {
2176 switch (c)
2177 {
2178 case 'r': /* numrows */
2179 errno = 0;
2180 numrows = strtol(optarg, NULL, 10);
2181 if (errno != 0 || numrows <= 0)
2182 {
2183 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2184 optarg);
2185 exit(1);
2186 }
2187 break;
2188 case 't': /* trace file */
2190 break;
2191 }
2192 }
2193
2194 if (optind < argc)
2195 {
2196 testname = pg_strdup(argv[optind]);
2197 optind++;
2198 }
2199 else
2200 {
2201 usage(argv[0]);
2202 exit(1);
2203 }
2204
2205 if (strcmp(testname, "tests") == 0)
2206 {
2208 exit(0);
2209 }
2210
2211 if (optind < argc)
2212 {
2213 conninfo = pg_strdup(argv[optind]);
2214 optind++;
2215 }
2216
2217 /* Make a connection to the database */
2218 conn = PQconnectdb(conninfo);
2219 if (PQstatus(conn) != CONNECTION_OK)
2220 {
2221 fprintf(stderr, "Connection to database failed: %s\n",
2224 }
2225
2226 res = PQexec(conn, "SET lc_messages TO \"C\"");
2228 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2229 res = PQexec(conn, "SET debug_parallel_query = off");
2231 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2232
2233 /* Set the trace file, if requested */
2234 if (tracefile != NULL)
2235 {
2236 if (strcmp(tracefile, "-") == 0)
2237 trace = stdout;
2238 else
2239 trace = fopen(tracefile, "w");
2240 if (trace == NULL)
2241 pg_fatal("could not open file \"%s\": %m", tracefile);
2242
2243 /* Make it line-buffered */
2244 setvbuf(trace, NULL, PG_IOLBF, 0);
2245
2246 PQtrace(conn, trace);
2249 }
2250
2251 if (strcmp(testname, "cancel") == 0)
2253 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2255 else if (strcmp(testname, "multi_pipelines") == 0)
2257 else if (strcmp(testname, "nosync") == 0)
2259 else if (strcmp(testname, "pipeline_abort") == 0)
2261 else if (strcmp(testname, "pipeline_idle") == 0)
2263 else if (strcmp(testname, "pipelined_insert") == 0)
2264 test_pipelined_insert(conn, numrows);
2265 else if (strcmp(testname, "prepared") == 0)
2267 else if (strcmp(testname, "simple_pipeline") == 0)
2269 else if (strcmp(testname, "singlerow") == 0)
2271 else if (strcmp(testname, "transaction") == 0)
2273 else if (strcmp(testname, "uniqviol") == 0)
2275 else
2276 {
2277 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2278 exit(1);
2279 }
2280
2281 /* close the connection to the database and cleanup */
2282 PQfinish(conn);
2283 return 0;
2284}
#define lengthof(array)
Definition: c.h:742
static PGcancel *volatile cancelConn
Definition: cancel.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:349
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:318
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:283
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:171
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:463
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:334
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:661
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:417
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:306
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:746
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6980
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7146
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4880
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7243
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7251
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7388
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7209
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7235
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:691
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2455
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3073
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2569
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3042
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2521
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2539
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2556
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3282
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2491
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3944
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1536
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2474
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1965
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3272
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2504
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3419
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3371
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3983
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
static struct @161 value
long val
Definition: informix.c:689
int i
Definition: isn.c:72
static const JsonPathKeyword keywords[]
@ CONNECTION_OK
Definition: libpq-fe.h:81
ExecStatusType
Definition: libpq-fe.h:118
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:137
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:131
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:133
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:134
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:135
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:467
PostgresPollingStatusType
Definition: libpq-fe.h:109
@ PGRES_POLLING_OK
Definition: libpq-fe.h:113
@ PGRES_POLLING_READING
Definition: libpq-fe.h:111
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:112
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:182
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:184
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:469
static void usage(const char *progname)
static void const char * fmt
#define MAXINT8LEN
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void confirm_query_canceled_impl(int line, PGconn *conn)
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
static void const char fflush(stdout)
va_end(args)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
#define confirm_query_canceled(conn)
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
exit(1)
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
va_start(args, fmt)
void pfree(void *pointer)
Definition: mcxt.c:1521
static AmcheckOptions opts
Definition: pg_amcheck.c:112
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:51
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:72
PGDLLIMPORT char * optarg
Definition: getopt.c:53
#define PG_IOLBF
Definition: port.h:361
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void pg_usleep(long microsec)
Definition: signal.c:53
PGconn * conn
Definition: streamutil.c:53
const char * keyword
Definition: regguts.h:323
const char * description
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513