PostgreSQL Source Code git master
Loading...
Searching...
No Matches
libpq_pipeline.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <sys/select.h>
19#include <sys/time.h>
20
21#include "catalog/pg_type_d.h"
22#include "libpq-fe.h"
23#include "pg_getopt.h"
24
25
26static void exit_nicely(PGconn *conn);
27pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
29static bool process_result(PGconn *conn, PGresult *res, int results,
30 int numsent);
31
33
34/* Options and defaults */
35static char *tracefile = NULL; /* path to PQtrace() file */
36
37
38#ifdef DEBUG_OUTPUT
39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40#else
41#define pg_debug(...)
42#endif
43
44static const char *const drop_table_sql =
45"DROP TABLE IF EXISTS pq_pipeline_demo";
46static const char *const create_table_sql =
47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48"int8filler int8);";
49static const char *const insert_sql =
50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51static const char *const insert_sql2 =
52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53
54/* max char length of an int32/64, plus sign and null terminator */
55#define MAXINTLEN 12
56#define MAXINT8LEN 20
57
58static void
60{
62 exit(1);
63}
64
65/*
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
68 */
69
70/*
71 * Print an error to stderr and terminate the program.
72 */
73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74pg_noreturn static void
75pg_fatal_impl(int line, const char *fmt,...)
76{
77 va_list args;
78
79 fflush(stdout);
80
81 fprintf(stderr, "\n%s:%d: ", progname, line);
82 va_start(args, fmt);
83 vfprintf(stderr, fmt, args);
84 va_end(args);
85 Assert(fmt[strlen(fmt) - 1] != '\n');
86 fprintf(stderr, "\n");
87 exit(1);
88}
89
90/*
91 * Check that libpq next returns a PGresult with the specified status,
92 * returning the PGresult so that caller can perform additional checks.
93 */
94#define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
95static PGresult *
97{
98 PGresult *res;
99
100 res = PQgetResult(conn);
101 if (res == NULL)
102 pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
104 if (PQresultStatus(res) != status)
105 pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
107 PQresStatus(status),
109 return res;
110}
111
112/*
113 * Check that libpq next returns a PGresult with the specified status,
114 * then free the PGresult.
115 */
116#define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
117static void
119{
120 PGresult *res;
121
122 res = confirm_result_status_impl(line, conn, status);
123 PQclear(res);
124}
125
126/*
127 * Check that libpq next returns a null PGresult.
128 */
129#define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
130static void
132{
133 PGresult *res;
134
135 res = PQgetResult(conn);
136 if (res != NULL)
137 pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
140}
141
142/*
143 * Check that the query on the given connection got canceled.
144 */
145#define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
146static void
148{
149 PGresult *res;
150
152 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
153 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
155 PQclear(res);
156
157 while (PQisBusy(conn))
159}
160
161/*
162 * Using monitorConn, query pg_stat_activity to see that the connection with
163 * the given PID is either in the given state, or waiting on the given event
164 * (only one of them can be given).
165 */
166static void
168 char *state, char *event)
169{
170 const Oid paramTypes[] = {INT4OID, TEXTOID};
171 const char *paramValues[2];
172 char *pidstr = psprintf("%d", procpid);
173
174 Assert((state == NULL) ^ (event == NULL));
175
176 paramValues[0] = pidstr;
177 paramValues[1] = state ? state : event;
178
179 while (true)
180 {
181 PGresult *res;
182 char *value;
183
184 if (state != NULL)
186 "SELECT count(*) FROM pg_stat_activity WHERE "
187 "pid = $1 AND state = $2",
188 2, paramTypes, paramValues, NULL, NULL, 0);
189 else
191 "SELECT count(*) FROM pg_stat_activity WHERE "
192 "pid = $1 AND wait_event = $2",
193 2, paramTypes, paramValues, NULL, NULL, 0);
194
196 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
197 if (PQntuples(res) != 1)
198 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
199 if (PQnfields(res) != 1)
200 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
201 value = PQgetvalue(res, 0, 0);
202 if (strcmp(value, "0") != 0)
203 {
204 PQclear(res);
205 break;
206 }
207 PQclear(res);
208
209 /* wait 10ms before polling again */
210 pg_usleep(10000);
211 }
212
213 pfree(pidstr);
214}
215
216#define send_cancellable_query(conn, monitorConn) \
217 send_cancellable_query_impl(__LINE__, conn, monitorConn)
218static void
220{
221 const char *env_wait;
222 const Oid paramTypes[1] = {INT4OID};
223
224 /*
225 * Wait for the connection to be idle, so that our check for an active
226 * connection below is reliable, instead of possibly seeing an outdated
227 * state.
228 */
230
231 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
232 if (env_wait == NULL)
233 env_wait = "180";
234
235 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
236 &env_wait, NULL, NULL, 0) != 1)
237 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
238
239 /*
240 * Wait for the sleep to be active, because if the query is not running
241 * yet, the cancel request that we send won't have any effect.
242 */
244}
245
246/*
247 * Create a new connection with the same conninfo as the given one.
248 */
249static PGconn *
251{
254 const char **keywords;
255 const char **vals;
256 int nopts = 0;
257 int i;
258
259 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
260 nopts++;
261 nopts++; /* for the NULL terminator */
262
263 keywords = pg_malloc(sizeof(char *) * nopts);
264 vals = pg_malloc(sizeof(char *) * nopts);
265
266 i = 0;
267 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
268 {
269 if (opt->val)
270 {
271 keywords[i] = opt->keyword;
272 vals[i] = opt->val;
273 i++;
274 }
275 }
276 keywords[i] = vals[i] = NULL;
277
278 copyConn = PQconnectdbParams(keywords, vals, false);
279
281 pg_fatal("Connection to database failed: %s",
283
285 pfree(vals);
287
288 return copyConn;
289}
290
291/*
292 * Test query cancellation routines
293 */
294static void
296{
300 char errorbuf[256];
301
302 fprintf(stderr, "test cancellations... ");
303
304 if (PQsetnonblocking(conn, 1) != 0)
305 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
306
307 /*
308 * Make a separate connection to the database to monitor the query on the
309 * main connection.
310 */
313
314 /* test PQcancel */
317 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
318 pg_fatal("failed to run PQcancel: %s", errorbuf);
320
321 /* PGcancel object can be reused for the next query */
323 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
324 pg_fatal("failed to run PQcancel: %s", errorbuf);
326
328
329 /* test PQrequestCancel */
331 if (!PQrequestCancel(conn))
332 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
334
335 /* test PQcancelBlocking */
339 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
342
343 /* test PQcancelCreate and then polling with PQcancelPoll */
347 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
348 while (true)
349 {
350 struct timeval tv;
354 int sock = PQcancelSocket(cancelConn);
355
357 break;
358
361 switch (pollres)
362 {
364 pg_debug("polling for reads\n");
365 FD_SET(sock, &input_mask);
366 break;
368 pg_debug("polling for writes\n");
369 FD_SET(sock, &output_mask);
370 break;
371 default:
372 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
373 }
374
375 if (sock < 0)
376 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
377
378 tv.tv_sec = 3;
379 tv.tv_usec = 0;
380
381 while (true)
382 {
383 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
384 {
385 if (errno == EINTR)
386 continue;
387 pg_fatal("select() failed: %m");
388 }
389 break;
390 }
391 }
393 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
395
396 /*
397 * test PQcancelReset works on the cancel connection and it can be reused
398 * afterwards
399 */
401
404 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
405 while (true)
406 {
407 struct timeval tv;
411 int sock = PQcancelSocket(cancelConn);
412
414 break;
415
418 switch (pollres)
419 {
421 pg_debug("polling for reads\n");
422 FD_SET(sock, &input_mask);
423 break;
425 pg_debug("polling for writes\n");
426 FD_SET(sock, &output_mask);
427 break;
428 default:
429 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
430 }
431
432 if (sock < 0)
433 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
434
435 tv.tv_sec = 3;
436 tv.tv_usec = 0;
437
438 while (true)
439 {
440 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
441 {
442 if (errno == EINTR)
443 continue;
444 pg_fatal("select() failed: %m");
445 }
446 break;
447 }
448 }
450 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
452
455
456 fprintf(stderr, "ok\n");
457}
458
459static void
461{
462 PGresult *res = NULL;
463
464 fprintf(stderr, "test error cases... ");
465
467 pg_fatal("Expected blocking connection mode");
468
469 if (PQenterPipelineMode(conn) != 1)
470 pg_fatal("Unable to enter pipeline mode");
471
473 pg_fatal("Pipeline mode not activated properly");
474
475 /* PQexec should fail in pipeline mode */
476 res = PQexec(conn, "SELECT 1");
478 pg_fatal("PQexec should fail in pipeline mode but succeeded");
480 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
481 pg_fatal("did not get expected error message; got: \"%s\"",
483 PQclear(res);
484
485 /* PQsendQuery should fail in pipeline mode */
486 if (PQsendQuery(conn, "SELECT 1") != 0)
487 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
489 "PQsendQuery not allowed in pipeline mode\n") != 0)
490 pg_fatal("did not get expected error message; got: \"%s\"",
492
493 /* Entering pipeline mode when already in pipeline mode is OK */
494 if (PQenterPipelineMode(conn) != 1)
495 pg_fatal("re-entering pipeline mode should be a no-op but failed");
496
497 if (PQisBusy(conn) != 0)
498 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
499
500 /* ok, back to normal command mode */
501 if (PQexitPipelineMode(conn) != 1)
502 pg_fatal("couldn't exit idle empty pipeline mode");
503
505 pg_fatal("Pipeline mode not terminated properly");
506
507 /* exiting pipeline mode when not in pipeline mode should be a no-op */
508 if (PQexitPipelineMode(conn) != 1)
509 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
510
511 /* can now PQexec again */
512 res = PQexec(conn, "SELECT 1");
514 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
516 PQclear(res);
517
518 fprintf(stderr, "ok\n");
519}
520
521static void
523{
524 const char *dummy_params[1] = {"1"};
526
527 fprintf(stderr, "multi pipeline... ");
528
529 /*
530 * Queue up a couple of small pipelines and process each without returning
531 * to command mode first.
532 */
533 if (PQenterPipelineMode(conn) != 1)
534 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
535
536 /* first pipeline */
537 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
538 dummy_params, NULL, NULL, 0) != 1)
539 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
540
541 if (PQpipelineSync(conn) != 1)
542 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
543
544 /* second pipeline */
545 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
546 dummy_params, NULL, NULL, 0) != 1)
547 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
548
549 /* Skip flushing once. */
550 if (PQsendPipelineSync(conn) != 1)
551 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
552
553 /* third pipeline */
554 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
555 dummy_params, NULL, NULL, 0) != 1)
556 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
557
558 if (PQpipelineSync(conn) != 1)
559 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
560
561 /* OK, start processing the results */
562
563 /* first pipeline */
565
567
568 if (PQexitPipelineMode(conn) != 0)
569 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
570
572
573 /* second pipeline */
575
577
578 if (PQexitPipelineMode(conn) != 0)
579 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
580
582
583 /* third pipeline */
585
587
589
590 /* We're still in pipeline mode ... */
592 pg_fatal("Fell out of pipeline mode somehow");
593
594 /* until we end it, which we can safely do now */
595 if (PQexitPipelineMode(conn) != 1)
596 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
598
600 pg_fatal("exiting pipeline mode didn't seem to work");
601
602 fprintf(stderr, "ok\n");
603}
604
605/*
606 * Test behavior when a pipeline dispatches a number of commands that are
607 * not flushed by a sync point.
608 */
609static void
611{
612 int numqueries = 10;
613 int results = 0;
614 int sock = PQsocket(conn);
615
616 fprintf(stderr, "nosync... ");
617
618 if (sock < 0)
619 pg_fatal("invalid socket");
620
621 if (PQenterPipelineMode(conn) != 1)
622 pg_fatal("could not enter pipeline mode");
623 for (int i = 0; i < numqueries; i++)
624 {
626 struct timeval tv;
627
628 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
629 0, NULL, NULL, NULL, NULL, 0) != 1)
630 pg_fatal("error sending select: %s", PQerrorMessage(conn));
631 PQflush(conn);
632
633 /*
634 * If the server has written anything to us, read (some of) it now.
635 */
637 FD_SET(sock, &input_mask);
638 tv.tv_sec = 0;
639 tv.tv_usec = 0;
640 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
641 {
642 fprintf(stderr, "select() failed: %m\n");
644 }
645 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
646 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
647 }
648
649 /* tell server to flush its output buffer */
650 if (PQsendFlushRequest(conn) != 1)
651 pg_fatal("failed to send flush request");
652 PQflush(conn);
653
654 /* Now read all results */
655 for (;;)
656 {
657 /* We expect exactly one TUPLES_OK result for each query we sent */
659
660 /* and one NULL result should follow each */
662
663 results++;
664
665 /* if we're done, we're done */
666 if (results == numqueries)
667 break;
668 }
669
670 fprintf(stderr, "ok\n");
671}
672
673/*
674 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
675 * still have to get results for each pipeline item, but the item will just be
676 * a PGRES_PIPELINE_ABORTED code.
677 *
678 * This intentionally doesn't use a transaction to wrap the pipeline. You should
679 * usually use an xact, but in this case we want to observe the effects of each
680 * statement.
681 */
682static void
684{
685 PGresult *res = NULL;
686 const char *dummy_params[1] = {"1"};
688 int i;
689 int gotrows;
690 bool goterror;
691
692 fprintf(stderr, "aborted pipeline... ");
693
694 res = PQexec(conn, drop_table_sql);
696 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
697 PQclear(res);
698
701 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
702 PQclear(res);
703
704 /*
705 * Queue up a couple of small pipelines and process each without returning
706 * to command mode first. Make sure the second operation in the first
707 * pipeline ERRORs.
708 */
709 if (PQenterPipelineMode(conn) != 1)
710 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
711
712 dummy_params[0] = "1";
714 dummy_params, NULL, NULL, 0) != 1)
715 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
716
717 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
719 NULL, NULL, 0) != 1)
720 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
721
722 dummy_params[0] = "2";
724 dummy_params, NULL, NULL, 0) != 1)
725 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
726
727 if (PQpipelineSync(conn) != 1)
728 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
729
730 dummy_params[0] = "3";
732 dummy_params, NULL, NULL, 0) != 1)
733 pg_fatal("dispatching second-pipeline insert failed: %s",
735
736 if (PQpipelineSync(conn) != 1)
737 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
738
739 /*
740 * OK, start processing the pipeline results.
741 *
742 * We should get a command-ok for the first query, then a fatal error and
743 * a pipeline aborted message for the second insert, a pipeline-end, then
744 * a command-ok and a pipeline-ok for the second pipeline operation.
745 */
747
748 /* NULL result to signal end-of-results for this command */
750
751 /* Second query caused error, so we expect an error next */
753
754 /* NULL result to signal end-of-results for this command */
756
757 /*
758 * pipeline should now be aborted.
759 *
760 * Note that we could still queue more queries at this point if we wanted;
761 * they'd get added to a new third pipeline since we've already sent a
762 * second. The aborted flag relates only to the pipeline being received.
763 */
765 pg_fatal("pipeline should be flagged as aborted but isn't");
766
767 /* third query in pipeline, the second insert */
769
770 /* NULL result to signal end-of-results for this command */
772
774 pg_fatal("pipeline should be flagged as aborted but isn't");
775
776 /* Ensure we're still in pipeline */
778 pg_fatal("Fell out of pipeline mode somehow");
779
780 /*
781 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
782 *
783 * (This is so clients know to start processing results normally again and
784 * can tell the difference between skipped commands and the sync.)
785 */
787
789 pg_fatal("sync should've cleared the aborted flag but didn't");
790
791 /* We're still in pipeline mode... */
793 pg_fatal("Fell out of pipeline mode somehow");
794
795 /* the insert from the second pipeline */
797
798 /* Read the NULL result at the end of the command */
800
801 /* the second pipeline sync */
803
804 /* Read the NULL result at the end of the command */
806
807 /* Try to send two queries in one command */
808 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
809 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
810 if (PQpipelineSync(conn) != 1)
811 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
812 goterror = false;
813 while ((res = PQgetResult(conn)) != NULL)
814 {
815 switch (PQresultStatus(res))
816 {
818 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
819 pg_fatal("expected error about multiple commands, got %s",
821 printf("got expected %s", PQerrorMessage(conn));
822 goterror = true;
823 break;
824 default:
825 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
826 break;
827 }
828 PQclear(res);
829 }
830 if (!goterror)
831 pg_fatal("did not get cannot-insert-multiple-commands error");
832
833 /* the second pipeline sync */
835
836 fprintf(stderr, "ok\n");
837
838 /* Test single-row mode with an error partways */
839 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
840 0, NULL, NULL, NULL, NULL, 0) != 1)
841 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
842 if (PQpipelineSync(conn) != 1)
843 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
845 goterror = false;
846 gotrows = 0;
847 while ((res = PQgetResult(conn)) != NULL)
848 {
849 switch (PQresultStatus(res))
850 {
852 printf("got row: %s\n", PQgetvalue(res, 0, 0));
853 gotrows++;
854 break;
856 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
857 pg_fatal("expected division-by-zero, got: %s (%s)",
860 printf("got expected division-by-zero\n");
861 goterror = true;
862 break;
863 default:
864 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
865 }
866 PQclear(res);
867 }
868 if (!goterror)
869 pg_fatal("did not get division-by-zero error");
870 if (gotrows != 3)
871 pg_fatal("did not get three rows");
872
873 /* the third pipeline sync */
875
876 /* We're still in pipeline mode... */
878 pg_fatal("Fell out of pipeline mode somehow");
879
880 /* until we end it, which we can safely do now */
881 if (PQexitPipelineMode(conn) != 1)
882 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
884
886 pg_fatal("exiting pipeline mode didn't seem to work");
887
888 /*-
889 * Since we fired the pipelines off without a surrounding xact, the results
890 * should be:
891 *
892 * - Implicit xact started by server around 1st pipeline
893 * - First insert applied
894 * - Second statement aborted xact
895 * - Third insert skipped
896 * - Sync rolled back first implicit xact
897 * - Implicit xact created by server around 2nd pipeline
898 * - insert applied from 2nd pipeline
899 * - Sync commits 2nd xact
900 *
901 * So we should only have the value 3 that we inserted.
902 */
903 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
904
906 pg_fatal("Expected tuples, got %s: %s",
908 if (PQntuples(res) != 1)
909 pg_fatal("expected 1 result, got %d", PQntuples(res));
910 for (i = 0; i < PQntuples(res); i++)
911 {
912 const char *val = PQgetvalue(res, i, 0);
913
914 if (strcmp(val, "3") != 0)
915 pg_fatal("expected only insert with value 3, got %s", val);
916 }
917
918 PQclear(res);
919
920 fprintf(stderr, "ok\n");
921}
922
923/* State machine enum for test_pipelined_insert */
935
936static void
938{
940 const char *insert_params[2];
945 int rows_to_send,
947
950
952
953 /*
954 * Do a pipelined insert into a table created at the start of the pipeline
955 */
956 if (PQenterPipelineMode(conn) != 1)
957 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
958
959 while (send_step != BI_PREPARE)
960 {
961 const char *sql;
962
963 switch (send_step)
964 {
965 case BI_BEGIN_TX:
966 sql = "BEGIN TRANSACTION";
968 break;
969
970 case BI_DROP_TABLE:
971 sql = drop_table_sql;
973 break;
974
975 case BI_CREATE_TABLE:
976 sql = create_table_sql;
978 break;
979
980 default:
981 pg_fatal("invalid state");
982 sql = NULL; /* keep compiler quiet */
983 }
984
985 pg_debug("sending: %s\n", sql);
986 if (PQsendQueryParams(conn, sql,
987 0, NULL, NULL, NULL, NULL, 0) != 1)
988 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
989 }
990
992 pg_debug("sending: %s\n", insert_sql2);
993 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
994 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
996
997 /*
998 * Now we start inserting. We'll be sending enough data that we could fill
999 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1000 * mode and consume input while we send more output. As results of each
1001 * query are processed we should pop them to allow processing of the next
1002 * query. There's no need to finish the pipeline before processing
1003 * results.
1004 */
1005 if (PQsetnonblocking(conn, 1) != 0)
1006 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1007
1008 while (recv_step != BI_DONE)
1009 {
1010 int sock;
1013
1014 sock = PQsocket(conn);
1015
1016 if (sock < 0)
1017 break; /* shouldn't happen */
1018
1020 FD_SET(sock, &input_mask);
1022 FD_SET(sock, &output_mask);
1023
1024 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1025 {
1026 fprintf(stderr, "select() failed: %m\n");
1028 }
1029
1030 /*
1031 * Process any results, so we keep the server's output buffer free
1032 * flowing and it can continue to process input
1033 */
1034 if (FD_ISSET(sock, &input_mask))
1035 {
1037
1038 /* Read until we'd block if we tried to read */
1039 while (!PQisBusy(conn) && recv_step < BI_DONE)
1040 {
1041 PGresult *res;
1042 const char *cmdtag = "";
1043 const char *description = "";
1044 int status;
1045
1046 /*
1047 * Read next result. If no more results from this query,
1048 * advance to the next query
1049 */
1050 res = PQgetResult(conn);
1051 if (res == NULL)
1052 continue;
1053
1054 status = PGRES_COMMAND_OK;
1055 switch (recv_step)
1056 {
1057 case BI_BEGIN_TX:
1058 cmdtag = "BEGIN";
1059 recv_step++;
1060 break;
1061 case BI_DROP_TABLE:
1062 cmdtag = "DROP TABLE";
1063 recv_step++;
1064 break;
1065 case BI_CREATE_TABLE:
1066 cmdtag = "CREATE TABLE";
1067 recv_step++;
1068 break;
1069 case BI_PREPARE:
1070 cmdtag = "";
1071 description = "PREPARE";
1072 recv_step++;
1073 break;
1074 case BI_INSERT_ROWS:
1075 cmdtag = "INSERT";
1077 if (rows_to_receive == 0)
1078 recv_step++;
1079 break;
1080 case BI_COMMIT_TX:
1081 cmdtag = "COMMIT";
1082 recv_step++;
1083 break;
1084 case BI_SYNC:
1085 cmdtag = "";
1086 description = "SYNC";
1087 status = PGRES_PIPELINE_SYNC;
1088 recv_step++;
1089 break;
1090 case BI_DONE:
1091 /* unreachable */
1092 pg_fatal("unreachable state");
1093 }
1094
1095 if (PQresultStatus(res) != status)
1096 pg_fatal("%s reported status %s, expected %s\n"
1097 "Error message: \"%s\"",
1099 PQresStatus(status), PQerrorMessage(conn));
1100
1101 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1102 pg_fatal("%s expected command tag '%s', got '%s'",
1104
1105 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1106
1107 PQclear(res);
1108 }
1109 }
1110
1111 /* Write more rows and/or the end pipeline message, if needed */
1112 if (FD_ISSET(sock, &output_mask))
1113 {
1114 PQflush(conn);
1115
1117 {
1119 /* use up some buffer space with a wide value */
1120 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1121
1122 if (PQsendQueryPrepared(conn, "my_insert",
1123 2, insert_params, NULL, NULL, 0) == 1)
1124 {
1125 pg_debug("sent row %d\n", rows_to_send);
1126
1127 rows_to_send--;
1128 if (rows_to_send == 0)
1129 send_step++;
1130 }
1131 else
1132 {
1133 /*
1134 * in nonblocking mode, so it's OK for an insert to fail
1135 * to send
1136 */
1137 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1139 }
1140 }
1141 else if (send_step == BI_COMMIT_TX)
1142 {
1143 if (PQsendQueryParams(conn, "COMMIT",
1144 0, NULL, NULL, NULL, NULL, 0) == 1)
1145 {
1146 pg_debug("sent COMMIT\n");
1147 send_step++;
1148 }
1149 else
1150 {
1151 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1153 }
1154 }
1155 else if (send_step == BI_SYNC)
1156 {
1157 if (PQpipelineSync(conn) == 1)
1158 {
1159 fprintf(stdout, "pipeline sync sent\n");
1160 send_step++;
1161 }
1162 else
1163 {
1164 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1166 }
1167 }
1168 }
1169 }
1170
1171 /* We've got the sync message and the pipeline should be done */
1172 if (PQexitPipelineMode(conn) != 1)
1173 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1175
1176 if (PQsetnonblocking(conn, 0) != 0)
1177 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1178
1179 fprintf(stderr, "ok\n");
1180}
1181
1182static void
1184{
1185 PGresult *res = NULL;
1186 Oid param_oids[1] = {INT4OID};
1187 Oid expected_oids[4];
1188 Oid typ;
1189
1190 fprintf(stderr, "prepared... ");
1191
1192 if (PQenterPipelineMode(conn) != 1)
1193 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1194 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1195 "interval '1 sec'",
1196 1, param_oids) != 1)
1197 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1202 if (PQsendDescribePrepared(conn, "select_one") != 1)
1203 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1204 if (PQpipelineSync(conn) != 1)
1205 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1206
1208
1210
1212 if (PQnfields(res) != lengthof(expected_oids))
1213 pg_fatal("expected %zu columns, got %d",
1215 for (int i = 0; i < PQnfields(res); i++)
1216 {
1217 typ = PQftype(res, i);
1218 if (typ != expected_oids[i])
1219 pg_fatal("field %d: expected type %u, got %u",
1220 i, expected_oids[i], typ);
1221 }
1222 PQclear(res);
1223
1225
1227
1228 fprintf(stderr, "closing statement..");
1229 if (PQsendClosePrepared(conn, "select_one") != 1)
1230 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1231 if (PQpipelineSync(conn) != 1)
1232 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1233
1235
1237
1239
1240 if (PQexitPipelineMode(conn) != 1)
1241 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1242
1243 /* Now that it's closed we should get an error when describing */
1244 res = PQdescribePrepared(conn, "select_one");
1246 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1247 PQclear(res);
1248
1249 /*
1250 * Also test the blocking close, this should not fail since closing a
1251 * non-existent prepared statement is a no-op
1252 */
1253 res = PQclosePrepared(conn, "select_one");
1254 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1256 PQclear(res);
1257
1258 fprintf(stderr, "creating portal... ");
1259
1260 res = PQexec(conn, "BEGIN");
1261 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1262 pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
1263 PQclear(res);
1264
1265 res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1266 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1267 pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
1268 PQclear(res);
1269
1271 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1272 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1273 if (PQpipelineSync(conn) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275
1277 typ = PQftype(res, 0);
1278 if (typ != INT4OID)
1279 pg_fatal("portal: expected type %u, got %u",
1280 INT4OID, typ);
1281 PQclear(res);
1282
1284
1286
1287 fprintf(stderr, "closing portal... ");
1288 if (PQsendClosePortal(conn, "cursor_one") != 1)
1289 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1290 if (PQpipelineSync(conn) != 1)
1291 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1292
1294
1296
1298
1299 if (PQexitPipelineMode(conn) != 1)
1300 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1301
1302 /* Now that it's closed we should get an error when describing */
1303 res = PQdescribePortal(conn, "cursor_one");
1305 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1306 PQclear(res);
1307
1308 /*
1309 * Also test the blocking close, this should not fail since closing a
1310 * non-existent portal is a no-op
1311 */
1312 res = PQclosePortal(conn, "cursor_one");
1313 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1314 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1315 PQclear(res);
1316
1317 fprintf(stderr, "ok\n");
1318}
1319
1320/*
1321 * Test max_protocol_version options.
1322 */
1323static void
1325{
1326 const char **keywords;
1327 const char **vals;
1328 int nopts;
1330 int protocol_version;
1332 int i;
1333
1334 /* Prepare keywords/vals arrays, copied from the existing connection. */
1335 nopts = 0;
1336 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1337 nopts++;
1338 nopts++; /* NULL terminator */
1339
1340 keywords = pg_malloc0(sizeof(char *) * nopts);
1341 vals = pg_malloc0(sizeof(char *) * nopts);
1342
1343 i = 0;
1344 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
1345 {
1346 /*
1347 * If the test already specified max_protocol_version, we want to
1348 * replace it rather than attempting to override it. This matters when
1349 * testing defaults, because empty option values at the end of the
1350 * connection string won't replace earlier settings.
1351 */
1352 if (strcmp(opt->keyword, "max_protocol_version") == 0)
1354 else if (!opt->val)
1355 continue;
1356
1357 keywords[i] = opt->keyword;
1358 vals[i] = opt->val;
1359
1360 i++;
1361 }
1362
1364
1365 /*
1366 * Test default protocol_version
1367 */
1368 vals[max_protocol_version_index] = "";
1369 conn = PQconnectdbParams(keywords, vals, false);
1370
1371 if (PQstatus(conn) != CONNECTION_OK)
1372 pg_fatal("Connection to database failed: %s",
1374
1375 protocol_version = PQfullProtocolVersion(conn);
1376 if (protocol_version != 30000)
1377 pg_fatal("expected 30000, got %d", protocol_version);
1378
1379 PQfinish(conn);
1380
1381 /*
1382 * Test max_protocol_version=3.0
1383 */
1384 vals[max_protocol_version_index] = "3.0";
1385 conn = PQconnectdbParams(keywords, vals, false);
1386
1387 if (PQstatus(conn) != CONNECTION_OK)
1388 pg_fatal("Connection to database failed: %s",
1390
1391 protocol_version = PQfullProtocolVersion(conn);
1392 if (protocol_version != 30000)
1393 pg_fatal("expected 30000, got %d", protocol_version);
1394
1395 PQfinish(conn);
1396
1397 /*
1398 * Test max_protocol_version=3.1. It's not valid, we went straight from
1399 * 3.0 to 3.2.
1400 */
1401 vals[max_protocol_version_index] = "3.1";
1402 conn = PQconnectdbParams(keywords, vals, false);
1403
1405 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1406
1407 PQfinish(conn);
1408
1409 /*
1410 * Test max_protocol_version=3.2
1411 */
1412 vals[max_protocol_version_index] = "3.2";
1413 conn = PQconnectdbParams(keywords, vals, false);
1414
1415 if (PQstatus(conn) != CONNECTION_OK)
1416 pg_fatal("Connection to database failed: %s",
1418
1419 protocol_version = PQfullProtocolVersion(conn);
1420 if (protocol_version != 30002)
1421 pg_fatal("expected 30002, got %d", protocol_version);
1422
1423 PQfinish(conn);
1424
1425 /*
1426 * Test max_protocol_version=latest. 'latest' currently means '3.2'.
1427 */
1428 vals[max_protocol_version_index] = "latest";
1429 conn = PQconnectdbParams(keywords, vals, false);
1430
1431 if (PQstatus(conn) != CONNECTION_OK)
1432 pg_fatal("Connection to database failed: %s",
1434
1435 protocol_version = PQfullProtocolVersion(conn);
1436 if (protocol_version != 30002)
1437 pg_fatal("expected 30002, got %d", protocol_version);
1438
1439 PQfinish(conn);
1440
1441 pfree(keywords);
1442 pfree(vals);
1444}
1445
1446/* Notice processor: print notices, and count how many we got */
1447static void
1448notice_processor(void *arg, const char *message)
1449{
1450 int *n_notices = (int *) arg;
1451
1452 (*n_notices)++;
1453 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1454}
1455
1456/* Verify behavior in "idle" state */
1457static void
1459{
1460 int n_notices = 0;
1461
1462 fprintf(stderr, "\npipeline idle...\n");
1463
1465
1466 /* Try to exit pipeline mode in pipeline-idle state */
1467 if (PQenterPipelineMode(conn) != 1)
1468 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1469 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1470 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1472
1474
1476
1477 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1478 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1479 if (PQexitPipelineMode(conn) == 1)
1480 pg_fatal("exiting pipeline succeeded when it shouldn't");
1481 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1482 strlen("cannot exit pipeline mode")) != 0)
1483 pg_fatal("did not get expected error; got: %s",
1486
1488
1490
1491 if (PQexitPipelineMode(conn) != 1)
1492 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1493
1494 if (n_notices > 0)
1495 pg_fatal("got %d notice(s)", n_notices);
1496 fprintf(stderr, "ok - 1\n");
1497
1498 /* Have a WARNING in the middle of a resultset */
1499 if (PQenterPipelineMode(conn) != 1)
1500 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1501 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1502 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1504
1506
1507 if (PQexitPipelineMode(conn) != 1)
1508 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1509 fprintf(stderr, "ok - 2\n");
1510}
1511
1512static void
1514{
1515 const char *dummy_params[1] = {"1"};
1517
1518 fprintf(stderr, "simple pipeline... ");
1519
1520 /*
1521 * Enter pipeline mode and dispatch a set of operations, which we'll then
1522 * process the results of as they come in.
1523 *
1524 * For a simple case we should be able to do this without interim
1525 * processing of results since our output buffer will give us enough slush
1526 * to work with and we won't block on sending. So blocking mode is fine.
1527 */
1528 if (PQisnonblocking(conn))
1529 pg_fatal("Expected blocking connection mode");
1530
1531 if (PQenterPipelineMode(conn) != 1)
1532 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1533
1534 if (PQsendQueryParams(conn, "SELECT $1",
1536 NULL, NULL, 0) != 1)
1537 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1538
1539 if (PQexitPipelineMode(conn) != 0)
1540 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1541
1542 if (PQpipelineSync(conn) != 1)
1543 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1544
1546
1548
1549 /*
1550 * Even though we've processed the result there's still a sync to come and
1551 * we can't exit pipeline mode yet
1552 */
1553 if (PQexitPipelineMode(conn) != 0)
1554 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1555
1557
1559
1560 /* We're still in pipeline mode... */
1562 pg_fatal("Fell out of pipeline mode somehow");
1563
1564 /* ... until we end it, which we can safely do now */
1565 if (PQexitPipelineMode(conn) != 1)
1566 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1568
1570 pg_fatal("Exiting pipeline mode didn't seem to work");
1571
1572 fprintf(stderr, "ok\n");
1573}
1574
1575static void
1577{
1578 PGresult *res;
1579 int i;
1580 bool pipeline_ended = false;
1581
1582 if (PQenterPipelineMode(conn) != 1)
1583 pg_fatal("failed to enter pipeline mode: %s",
1585
1586 /* One series of three commands, using single-row mode for the first two. */
1587 for (i = 0; i < 3; i++)
1588 {
1589 char *param[1];
1590
1591 param[0] = psprintf("%d", 44 + i);
1592
1594 "SELECT generate_series(42, $1)",
1595 1,
1596 NULL,
1597 (const char *const *) param,
1598 NULL,
1599 NULL,
1600 0) != 1)
1601 pg_fatal("failed to send query: %s",
1603 pfree(param[0]);
1604 }
1605 if (PQpipelineSync(conn) != 1)
1606 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1607
1608 for (i = 0; !pipeline_ended; i++)
1609 {
1610 bool first = true;
1612 bool isSingleTuple = false;
1613
1614 /* Set single row mode for only first 2 SELECT queries */
1615 if (i < 2)
1616 {
1617 if (PQsetSingleRowMode(conn) != 1)
1618 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1619 }
1620
1621 /* Consume rows for this query */
1622 saw_ending_tuplesok = false;
1623 while ((res = PQgetResult(conn)) != NULL)
1624 {
1626
1627 if (est == PGRES_PIPELINE_SYNC)
1628 {
1629 fprintf(stderr, "end of pipeline reached\n");
1630 pipeline_ended = true;
1631 PQclear(res);
1632 if (i != 3)
1633 pg_fatal("Expected three results, got %d", i);
1634 break;
1635 }
1636
1637 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1638 if (first)
1639 {
1640 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1641 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1642 i, PQresStatus(est));
1643 if (i >= 2 && est != PGRES_TUPLES_OK)
1644 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1645 i, PQresStatus(est));
1646 first = false;
1647 }
1648
1649 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1650 switch (est)
1651 {
1652 case PGRES_TUPLES_OK:
1653 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1654 saw_ending_tuplesok = true;
1655 if (isSingleTuple)
1656 {
1657 if (PQntuples(res) == 0)
1658 fprintf(stderr, "all tuples received in query %d\n", i);
1659 else
1660 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1661 }
1662 break;
1663
1664 case PGRES_SINGLE_TUPLE:
1665 isSingleTuple = true;
1666 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1667 break;
1668
1669 default:
1670 pg_fatal("unexpected");
1671 }
1672 PQclear(res);
1673 }
1675 pg_fatal("didn't get expected terminating TUPLES_OK");
1676 }
1677
1678 /*
1679 * Now issue one command, get its results in with single-row mode, then
1680 * issue another command, and get its results in normal mode; make sure
1681 * the single-row mode flag is reset as expected.
1682 */
1683 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1684 0, NULL, NULL, NULL, NULL, 0) != 1)
1685 pg_fatal("failed to send query: %s",
1687 if (PQsendFlushRequest(conn) != 1)
1688 pg_fatal("failed to send flush request");
1689 if (PQsetSingleRowMode(conn) != 1)
1690 pg_fatal("PQsetSingleRowMode() failed");
1691
1693
1695
1697
1698 if (PQsendQueryParams(conn, "SELECT 1",
1699 0, NULL, NULL, NULL, NULL, 0) != 1)
1700 pg_fatal("failed to send query: %s",
1702 if (PQsendFlushRequest(conn) != 1)
1703 pg_fatal("failed to send flush request");
1704
1706
1708
1709 /*
1710 * Try chunked mode as well; make sure that it correctly delivers a
1711 * partial final chunk.
1712 */
1713 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1714 0, NULL, NULL, NULL, NULL, 0) != 1)
1715 pg_fatal("failed to send query: %s",
1717 if (PQsendFlushRequest(conn) != 1)
1718 pg_fatal("failed to send flush request");
1719 if (PQsetChunkedRowsMode(conn, 3) != 1)
1720 pg_fatal("PQsetChunkedRowsMode() failed");
1721
1723 if (PQntuples(res) != 3)
1724 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1725 PQclear(res);
1726
1728 if (PQntuples(res) != 2)
1729 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1730 PQclear(res);
1731
1733 if (PQntuples(res) != 0)
1734 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1735 PQclear(res);
1736
1738
1739 if (PQexitPipelineMode(conn) != 1)
1740 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1741
1742 fprintf(stderr, "ok\n");
1743}
1744
1745/*
1746 * Simple test to verify that a pipeline is discarded as a whole when there's
1747 * an error, ignoring transaction commands.
1748 */
1749static void
1751{
1752 PGresult *res;
1753 bool expect_null;
1754 int num_syncs = 0;
1755
1756 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1757 "CREATE TABLE pq_pipeline_tst (id int)");
1758 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1759 pg_fatal("failed to create test table: %s",
1761 PQclear(res);
1762
1763 if (PQenterPipelineMode(conn) != 1)
1764 pg_fatal("failed to enter pipeline mode: %s",
1766 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1767 pg_fatal("could not send prepare on pipeline: %s",
1769
1771 "BEGIN",
1772 0, NULL, NULL, NULL, NULL, 0) != 1)
1773 pg_fatal("failed to send query: %s",
1776 "SELECT 0/0",
1777 0, NULL, NULL, NULL, NULL, 0) != 1)
1778 pg_fatal("failed to send query: %s",
1780
1781 /*
1782 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1783 * get out of the pipeline-aborted state first.
1784 */
1785 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1786 pg_fatal("failed to execute prepared: %s",
1788
1789 /* This insert fails because we're in pipeline-aborted state */
1791 "INSERT INTO pq_pipeline_tst VALUES (1)",
1792 0, NULL, NULL, NULL, NULL, 0) != 1)
1793 pg_fatal("failed to send query: %s",
1795 if (PQpipelineSync(conn) != 1)
1796 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1797 num_syncs++;
1798
1799 /*
1800 * This insert fails even though the pipeline got a SYNC, because we're in
1801 * an aborted transaction
1802 */
1804 "INSERT INTO pq_pipeline_tst VALUES (2)",
1805 0, NULL, NULL, NULL, NULL, 0) != 1)
1806 pg_fatal("failed to send query: %s",
1808 if (PQpipelineSync(conn) != 1)
1809 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1810 num_syncs++;
1811
1812 /*
1813 * Send ROLLBACK using prepared stmt. This one works because we just did
1814 * PQpipelineSync above.
1815 */
1816 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1817 pg_fatal("failed to execute prepared: %s",
1819
1820 /*
1821 * Now that we're out of a transaction and in pipeline-good mode, this
1822 * insert works
1823 */
1825 "INSERT INTO pq_pipeline_tst VALUES (3)",
1826 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 pg_fatal("failed to send query: %s",
1829 /* Send two syncs now -- match up to SYNC messages below */
1830 if (PQpipelineSync(conn) != 1)
1831 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1832 num_syncs++;
1833 if (PQpipelineSync(conn) != 1)
1834 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1835 num_syncs++;
1836
1837 expect_null = false;
1838 for (int i = 0;; i++)
1839 {
1841
1842 res = PQgetResult(conn);
1843 if (res == NULL)
1844 {
1845 printf("%d: got NULL result\n", i);
1846 if (!expect_null)
1847 pg_fatal("did not expect NULL here");
1848 expect_null = false;
1849 continue;
1850 }
1851 restype = PQresultStatus(res);
1852 printf("%d: got status %s", i, PQresStatus(restype));
1853 if (expect_null)
1854 pg_fatal("expected NULL");
1856 printf("; error: %s", PQerrorMessage(conn));
1857 else if (restype == PGRES_PIPELINE_ABORTED)
1858 {
1859 printf(": command didn't run because pipeline aborted\n");
1860 }
1861 else
1862 printf("\n");
1863 PQclear(res);
1864
1866 num_syncs--;
1867 else
1868 expect_null = true;
1869 if (num_syncs <= 0)
1870 break;
1871 }
1872
1874
1875 if (PQexitPipelineMode(conn) != 1)
1876 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1877
1878 /* We expect to find one tuple containing the value "3" */
1879 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1880 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1881 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1882 if (PQntuples(res) != 1)
1883 pg_fatal("did not get 1 tuple");
1884 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1885 pg_fatal("did not get expected tuple");
1886 PQclear(res);
1887
1888 fprintf(stderr, "ok\n");
1889}
1890
1891/*
1892 * In this test mode we send a stream of queries, with one in the middle
1893 * causing an error. Verify that we can still send some more after the
1894 * error and have libpq work properly.
1895 */
1896static void
1898{
1899 int sock = PQsocket(conn);
1900 PGresult *res;
1901 Oid paramTypes[2] = {INT8OID, INT8OID};
1902 const char *paramValues[2];
1903 char paramValue0[MAXINT8LEN];
1904 char paramValue1[MAXINT8LEN];
1905 int ctr = 0;
1906 int numsent = 0;
1907 int results = 0;
1908 bool read_done = false;
1909 bool write_done = false;
1910 bool error_sent = false;
1911 bool got_error = false;
1912 int switched = 0;
1913 int socketful = 0;
1914 fd_set in_fds;
1916
1917 fprintf(stderr, "uniqviol ...");
1918
1920
1923 sprintf(paramValue1, "42");
1924
1925 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1926 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1927 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1928 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1929 PQclear(res);
1930
1931 res = PQexec(conn, "begin");
1932 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1933 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1934 PQclear(res);
1935
1936 res = PQprepare(conn, "insertion",
1937 "insert into ppln_uniqviol values ($1, $2) returning id",
1938 2, paramTypes);
1939 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1940 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1941 PQclear(res);
1942
1943 if (PQenterPipelineMode(conn) != 1)
1944 pg_fatal("failed to enter pipeline mode");
1945
1946 while (!read_done)
1947 {
1948 /*
1949 * Avoid deadlocks by reading everything the server has sent before
1950 * sending anything. (Special precaution is needed here to process
1951 * PQisBusy before testing the socket for read-readiness, because the
1952 * socket does not turn read-ready after "sending" queries in aborted
1953 * pipeline mode.)
1954 */
1955 while (PQisBusy(conn) == 0)
1956 {
1957 bool new_error;
1958
1959 if (results >= numsent)
1960 {
1961 if (write_done)
1962 read_done = true;
1963 break;
1964 }
1965
1966 res = PQgetResult(conn);
1967 new_error = process_result(conn, res, results, numsent);
1968 if (new_error && got_error)
1969 pg_fatal("got two errors");
1971 if (results++ >= numsent - 1)
1972 {
1973 if (write_done)
1974 read_done = true;
1975 break;
1976 }
1977 }
1978
1979 if (read_done)
1980 break;
1981
1982 FD_ZERO(&out_fds);
1983 FD_SET(sock, &out_fds);
1984
1985 FD_ZERO(&in_fds);
1986 FD_SET(sock, &in_fds);
1987
1988 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1989 {
1990 if (errno == EINTR)
1991 continue;
1992 pg_fatal("select() failed: %m");
1993 }
1994
1995 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1996 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1997
1998 /*
1999 * If the socket is writable and we haven't finished sending queries,
2000 * send some.
2001 */
2002 if (!write_done && FD_ISSET(sock, &out_fds))
2003 {
2004 for (;;)
2005 {
2006 int flush;
2007
2008 /*
2009 * provoke uniqueness violation exactly once after having
2010 * switched to read mode.
2011 */
2012 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2013 {
2014 sprintf(paramValue0, "%d", numsent / 2);
2015 fprintf(stderr, "E");
2016 error_sent = true;
2017 }
2018 else
2019 {
2020 fprintf(stderr, ".");
2021 sprintf(paramValue0, "%d", ctr++);
2022 }
2023
2024 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2025 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2026 numsent++;
2027
2028 /* Are we done writing? */
2029 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2030 {
2031 if (PQsendFlushRequest(conn) != 1)
2032 pg_fatal("failed to send flush request");
2033 write_done = true;
2034 fprintf(stderr, "\ndone writing\n");
2035 PQflush(conn);
2036 break;
2037 }
2038
2039 /* is the outgoing socket full? */
2040 flush = PQflush(conn);
2041 if (flush == -1)
2042 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2043 if (flush == 1)
2044 {
2045 if (socketful == 0)
2047 fprintf(stderr, "\nswitch to reading\n");
2048 switched++;
2049 break;
2050 }
2051 }
2052 }
2053 }
2054
2055 if (!got_error)
2056 pg_fatal("did not get expected error");
2057
2058 fprintf(stderr, "ok\n");
2059}
2060
2061/*
2062 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2063 * the expected NULL that should follow it.
2064 *
2065 * Returns true if we read a fatal error message, otherwise false.
2066 */
2067static bool
2068process_result(PGconn *conn, PGresult *res, int results, int numsent)
2069{
2070 bool got_error = false;
2071
2072 if (res == NULL)
2073 pg_fatal("got unexpected NULL");
2074
2075 switch (PQresultStatus(res))
2076 {
2077 case PGRES_FATAL_ERROR:
2078 got_error = true;
2079 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2080 PQclear(res);
2082 break;
2083
2084 case PGRES_TUPLES_OK:
2085 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2086 PQclear(res);
2088 break;
2089
2091 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2092 PQclear(res);
2094 break;
2095
2096 default:
2097 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2098 }
2099
2100 return got_error;
2101}
2102
2103
2104static void
2105usage(const char *progname)
2106{
2107 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2108 fprintf(stderr, "Usage:\n");
2109 fprintf(stderr, " %s [OPTION] tests\n", progname);
2110 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2111 fprintf(stderr, "\nOptions:\n");
2112 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2113 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2114}
2115
2116static void
2118{
2119 printf("cancel\n");
2120 printf("disallowed_in_pipeline\n");
2121 printf("multi_pipelines\n");
2122 printf("nosync\n");
2123 printf("pipeline_abort\n");
2124 printf("pipeline_idle\n");
2125 printf("pipelined_insert\n");
2126 printf("prepared\n");
2127 printf("protocol_version\n");
2128 printf("simple_pipeline\n");
2129 printf("singlerow\n");
2130 printf("transaction\n");
2131 printf("uniqviol\n");
2132}
2133
2134int
2135main(int argc, char **argv)
2136{
2137 const char *conninfo = "";
2138 PGconn *conn;
2139 FILE *trace = NULL;
2140 char *testname;
2141 int numrows = 10000;
2142 PGresult *res;
2143 int c;
2144
2145 while ((c = getopt(argc, argv, "r:t:")) != -1)
2146 {
2147 switch (c)
2148 {
2149 case 'r': /* numrows */
2150 errno = 0;
2151 numrows = strtol(optarg, NULL, 10);
2152 if (errno != 0 || numrows <= 0)
2153 {
2154 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2155 optarg);
2156 exit(1);
2157 }
2158 break;
2159 case 't': /* trace file */
2161 break;
2162 }
2163 }
2164
2165 if (optind < argc)
2166 {
2167 testname = pg_strdup(argv[optind]);
2168 optind++;
2169 }
2170 else
2171 {
2172 usage(argv[0]);
2173 exit(1);
2174 }
2175
2176 if (strcmp(testname, "tests") == 0)
2177 {
2179 exit(0);
2180 }
2181
2182 if (optind < argc)
2183 {
2184 conninfo = pg_strdup(argv[optind]);
2185 optind++;
2186 }
2187
2188 /* Make a connection to the database */
2189 conn = PQconnectdb(conninfo);
2190 if (PQstatus(conn) != CONNECTION_OK)
2191 {
2192 fprintf(stderr, "Connection to database failed: %s\n",
2195 }
2196
2197 res = PQexec(conn, "SET lc_messages TO \"C\"");
2198 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2199 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2200 PQclear(res);
2201 res = PQexec(conn, "SET debug_parallel_query = off");
2202 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2203 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2204 PQclear(res);
2205
2206 /* Set the trace file, if requested */
2207 if (tracefile != NULL)
2208 {
2209 if (strcmp(tracefile, "-") == 0)
2210 trace = stdout;
2211 else
2212 trace = fopen(tracefile, "w");
2213 if (trace == NULL)
2214 pg_fatal("could not open file \"%s\": %m", tracefile);
2215
2216 /* Make it line-buffered */
2217 setvbuf(trace, NULL, PG_IOLBF, 0);
2218
2219 PQtrace(conn, trace);
2222 }
2223
2224 if (strcmp(testname, "cancel") == 0)
2226 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2228 else if (strcmp(testname, "multi_pipelines") == 0)
2230 else if (strcmp(testname, "nosync") == 0)
2232 else if (strcmp(testname, "pipeline_abort") == 0)
2234 else if (strcmp(testname, "pipeline_idle") == 0)
2236 else if (strcmp(testname, "pipelined_insert") == 0)
2237 test_pipelined_insert(conn, numrows);
2238 else if (strcmp(testname, "prepared") == 0)
2240 else if (strcmp(testname, "protocol_version") == 0)
2242 else if (strcmp(testname, "simple_pipeline") == 0)
2244 else if (strcmp(testname, "singlerow") == 0)
2246 else if (strcmp(testname, "transaction") == 0)
2248 else if (strcmp(testname, "uniqviol") == 0)
2250 else
2251 {
2252 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2253 exit(1);
2254 }
2255
2256 /* close the connection to the database and cleanup */
2257 PQfinish(conn);
2258
2259 if (trace && trace != stdout)
2260 fclose(trace);
2261
2262 return 0;
2263}
#define pg_noreturn
Definition c.h:164
#define Assert(condition)
Definition c.h:873
#define pg_attribute_printf(f, a)
Definition c.h:242
#define lengthof(array)
Definition c.h:803
static PGcancel *volatile cancelConn
Definition cancel.c:43
int main(void)
#define fprintf(file, fmt, msg)
Definition cubescan.l:21
PGcancel * PQgetCancel(PGconn *conn)
Definition fe-cancel.c:368
void PQcancelReset(PGcancelConn *cancelConn)
Definition fe-cancel.c:337
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition fe-cancel.c:68
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition fe-cancel.c:302
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition fe-cancel.c:190
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition fe-cancel.c:548
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition fe-cancel.c:226
void PQcancelFinish(PGcancelConn *cancelConn)
Definition fe-cancel.c:353
int PQrequestCancel(PGconn *conn)
Definition fe-cancel.c:752
void PQfreeCancel(PGcancel *cancel)
Definition fe-cancel.c:502
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition fe-cancel.c:313
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition fe-cancel.c:325
int PQcancelStart(PGcancelConn *cancelConn)
Definition fe-cancel.c:204
int PQfullProtocolVersion(const PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:825
PQconninfoOption * PQconninfo(PGconn *conn)
void PQconninfoFree(PQconninfoOption *connOptions)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
int PQbackendPID(const PGconn *conn)
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition fe-connect.c:770
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:1509
int PQsetSingleRowMode(PGconn *conn)
Definition fe-exec.c:1965
int PQflush(PGconn *conn)
Definition fe-exec.c:4017
Oid PQftype(const PGresult *res, int field_num)
Definition fe-exec.c:3736
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition fe-exec.c:2472
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:2293
int PQexitPipelineMode(PGconn *conn)
Definition fe-exec.c:3090
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition fe-exec.c:2586
int PQenterPipelineMode(PGconn *conn)
Definition fe-exec.c:3059
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition fe-exec.c:2538
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition fe-exec.c:2556
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition fe-exec.c:2573
int PQsendPipelineSync(PGconn *conn)
Definition fe-exec.c:3299
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition fe-exec.c:2323
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition fe-exec.c:2508
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsetnonblocking(PGconn *conn, int arg)
Definition fe-exec.c:3961
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition fe-exec.c:1553
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition fe-exec.c:2491
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition fe-exec.c:1982
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQpipelineSync(PGconn *conn)
Definition fe-exec.c:3289
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition fe-exec.c:2521
char * PQresStatus(ExecStatusType status)
Definition fe-exec.c:3436
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:1650
int PQsendFlushRequest(PGconn *conn)
Definition fe-exec.c:3388
int PQisnonblocking(const PGconn *conn)
Definition fe-exec.c:4000
void PQtrace(PGconn *conn, FILE *debug_port)
Definition fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition fe-trace.c:64
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition fe_memutils.c:53
long val
Definition informix.c:689
static struct @172 value
int i
Definition isn.c:77
static const JsonPathKeyword keywords[]
#define PQgetvalue
#define PQgetResult
#define PQcmdStatus
#define PQclear
#define PQresultErrorField
#define PQnfields
#define PQresultStatus
#define PQntuples
@ CONNECTION_BAD
Definition libpq-fe.h:85
@ CONNECTION_OK
Definition libpq-fe.h:84
ExecStatusType
Definition libpq-fe.h:123
@ PGRES_COMMAND_OK
Definition libpq-fe.h:125
@ PGRES_TUPLES_CHUNK
Definition libpq-fe.h:142
@ PGRES_FATAL_ERROR
Definition libpq-fe.h:136
@ PGRES_SINGLE_TUPLE
Definition libpq-fe.h:138
@ PGRES_PIPELINE_SYNC
Definition libpq-fe.h:139
@ PGRES_PIPELINE_ABORTED
Definition libpq-fe.h:140
@ PGRES_TUPLES_OK
Definition libpq-fe.h:128
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition libpq-fe.h:478
PostgresPollingStatusType
Definition libpq-fe.h:114
@ PGRES_POLLING_OK
Definition libpq-fe.h:118
@ PGRES_POLLING_READING
Definition libpq-fe.h:116
@ PGRES_POLLING_WRITING
Definition libpq-fe.h:117
@ PQ_PIPELINE_OFF
Definition libpq-fe.h:187
@ PQ_PIPELINE_ABORTED
Definition libpq-fe.h:189
#define PQTRACE_REGRESS_MODE
Definition libpq-fe.h:480
#define MAXINT8LEN
static void print_test_list(void)
static const char *const insert_sql2
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
#define consume_query_cancel(conn)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static pg_noreturn void static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static void consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
static const char *const insert_sql
#define pg_debug(...)
static void consume_null_result_impl(int line, PGconn *conn)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void test_protocol_version(PGconn *conn)
static void test_nosync(PGconn *conn)
#define consume_null_result(conn)
static PGresult * confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
#define confirm_result_status(conn, status)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static pg_noreturn void pg_fatal_impl(int line, const char *fmt,...) pg_attribute_printf(2
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
#define consume_result_status(conn, status)
static void test_transaction(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
#define pg_fatal(...)
static void consume_query_cancel_impl(int line, PGconn *conn)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
Definition mcxt.c:1616
static AmcheckOptions opts
Definition pg_amcheck.c:112
static void usage(void)
void * arg
PGDLLIMPORT int optind
Definition getopt.c:51
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition getopt.c:72
PGDLLIMPORT char * optarg
Definition getopt.c:53
#define PG_IOLBF
Definition port.h:409
#define sprintf
Definition port.h:262
#define vfprintf
Definition port.h:263
#define snprintf
Definition port.h:260
#define printf(...)
Definition port.h:266
unsigned int Oid
#define PG_DIAG_SQLSTATE
char * c
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
void pg_usleep(long microsec)
Definition signal.c:53
PGconn * conn
Definition streamutil.c:52
const char * keyword
const char * description
#define EINTR
Definition win32_port.h:361
#define select(n, r, w, e, timeout)
Definition win32_port.h:500