PostgreSQL Source Code  git master
libpq_pipeline.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  * Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/select.h>
19 #include <sys/time.h>
20 
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
23 #include "libpq-fe.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
26 
27 
28 static void exit_nicely(PGconn *conn);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32  int numsent);
33 
34 static const char *const progname = "libpq_pipeline";
35 
36 /* Options and defaults */
37 static char *tracefile = NULL; /* path to PQtrace() file */
38 
39 
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45 
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59 
60 static void
62 {
63  PQfinish(conn);
64  exit(1);
65 }
66 
67 /*
68  * The following few functions are wrapped in macros to make the reported line
69  * number in an error match the line number of the invocation.
70  */
71 
72 /*
73  * Print an error to stderr and terminate the program.
74  */
75 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
76 static void
78 pg_fatal_impl(int line, const char *fmt,...)
79 {
80  va_list args;
81 
83 
84  fprintf(stderr, "\n%s:%d: ", progname, line);
86  vfprintf(stderr, fmt, args);
88  Assert(fmt[strlen(fmt) - 1] != '\n');
89  fprintf(stderr, "\n");
90  exit(1);
91 }
92 
93 /*
94  * Check that the query on the given connection got canceled.
95  */
96 #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
97 static void
99 {
100  PGresult *res = NULL;
101 
102  res = PQgetResult(conn);
103  if (res == NULL)
104  pg_fatal_impl(line, "PQgetResult returned null: %s",
107  pg_fatal_impl(line, "query did not fail when it was expected");
108  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
109  pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
111  PQclear(res);
112 
113  while (PQisBusy(conn))
115 }
116 
117 /*
118  * Using monitorConn, query pg_stat_activity to see that the connection with
119  * the given PID is either in the given state, or waiting on the given event
120  * (only one of them can be given).
121  */
122 static void
123 wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
124  char *state, char *event)
125 {
126  const Oid paramTypes[] = {INT4OID, TEXTOID};
127  const char *paramValues[2];
128  char *pidstr = psprintf("%d", procpid);
129 
130  Assert((state == NULL) ^ (event == NULL));
131 
132  paramValues[0] = pidstr;
133  paramValues[1] = state ? state : event;
134 
135  while (true)
136  {
137  PGresult *res;
138  char *value;
139 
140  if (state != NULL)
141  res = PQexecParams(monitorConn,
142  "SELECT count(*) FROM pg_stat_activity WHERE "
143  "pid = $1 AND state = $2",
144  2, paramTypes, paramValues, NULL, NULL, 0);
145  else
146  res = PQexecParams(monitorConn,
147  "SELECT count(*) FROM pg_stat_activity WHERE "
148  "pid = $1 AND wait_event = $2",
149  2, paramTypes, paramValues, NULL, NULL, 0);
150 
152  pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
153  if (PQntuples(res) != 1)
154  pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
155  if (PQnfields(res) != 1)
156  pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
157  value = PQgetvalue(res, 0, 0);
158  if (strcmp(value, "0") != 0)
159  {
160  PQclear(res);
161  break;
162  }
163  PQclear(res);
164 
165  /* wait 10ms before polling again */
166  pg_usleep(10000);
167  }
168 
169  pfree(pidstr);
170 }
171 
172 #define send_cancellable_query(conn, monitorConn) \
173  send_cancellable_query_impl(__LINE__, conn, monitorConn)
174 static void
176 {
177  const char *env_wait;
178  const Oid paramTypes[1] = {INT4OID};
179 
180  /*
181  * Wait for the connection to be idle, so that our check for an active
182  * connection below is reliable, instead of possibly seeing an outdated
183  * state.
184  */
185  wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
186 
187  env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
188  if (env_wait == NULL)
189  env_wait = "180";
190 
191  if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
192  &env_wait, NULL, NULL, 0) != 1)
193  pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
194 
195  /*
196  * Wait for the sleep to be active, because if the query is not running
197  * yet, the cancel request that we send won't have any effect.
198  */
199  wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
200 }
201 
202 /*
203  * Create a new connection with the same conninfo as the given one.
204  */
205 static PGconn *
207 {
208  PGconn *copyConn;
210  const char **keywords;
211  const char **vals;
212  int nopts = 1;
213  int i = 0;
214 
215  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
216  nopts++;
217 
218  keywords = pg_malloc(sizeof(char *) * nopts);
219  vals = pg_malloc(sizeof(char *) * nopts);
220 
221  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
222  {
223  if (opt->val)
224  {
225  keywords[i] = opt->keyword;
226  vals[i] = opt->val;
227  i++;
228  }
229  }
230  keywords[i] = vals[i] = NULL;
231 
232  copyConn = PQconnectdbParams(keywords, vals, false);
233 
234  if (PQstatus(copyConn) != CONNECTION_OK)
235  pg_fatal("Connection to database failed: %s",
236  PQerrorMessage(copyConn));
237 
238  return copyConn;
239 }
240 
241 /*
242  * Test query cancellation routines
243  */
244 static void
246 {
247  PGcancel *cancel;
249  PGconn *monitorConn;
250  char errorbuf[256];
251 
252  fprintf(stderr, "test cancellations... ");
253 
254  if (PQsetnonblocking(conn, 1) != 0)
255  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
256 
257  /*
258  * Make a separate connection to the database to monitor the query on the
259  * main connection.
260  */
261  monitorConn = copy_connection(conn);
262  Assert(PQstatus(monitorConn) == CONNECTION_OK);
263 
264  /* test PQcancel */
265  send_cancellable_query(conn, monitorConn);
266  cancel = PQgetCancel(conn);
267  if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
268  pg_fatal("failed to run PQcancel: %s", errorbuf);
270 
271  /* PGcancel object can be reused for the next query */
272  send_cancellable_query(conn, monitorConn);
273  if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
274  pg_fatal("failed to run PQcancel: %s", errorbuf);
276 
277  PQfreeCancel(cancel);
278 
279  /* test PQrequestCancel */
280  send_cancellable_query(conn, monitorConn);
281  if (!PQrequestCancel(conn))
282  pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
284 
285  /* test PQcancelBlocking */
286  send_cancellable_query(conn, monitorConn);
289  pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
292 
293  /* test PQcancelCreate and then polling with PQcancelPoll */
294  send_cancellable_query(conn, monitorConn);
297  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
298  while (true)
299  {
300  struct timeval tv;
301  fd_set input_mask;
302  fd_set output_mask;
304  int sock = PQcancelSocket(cancelConn);
305 
306  if (pollres == PGRES_POLLING_OK)
307  break;
308 
309  FD_ZERO(&input_mask);
310  FD_ZERO(&output_mask);
311  switch (pollres)
312  {
314  pg_debug("polling for reads\n");
315  FD_SET(sock, &input_mask);
316  break;
318  pg_debug("polling for writes\n");
319  FD_SET(sock, &output_mask);
320  break;
321  default:
322  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
323  }
324 
325  if (sock < 0)
326  pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
327 
328  tv.tv_sec = 3;
329  tv.tv_usec = 0;
330 
331  while (true)
332  {
333  if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
334  {
335  if (errno == EINTR)
336  continue;
337  pg_fatal("select() failed: %m");
338  }
339  break;
340  }
341  }
343  pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
345 
346  /*
347  * test PQcancelReset works on the cancel connection and it can be reused
348  * afterwards
349  */
351 
352  send_cancellable_query(conn, monitorConn);
354  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
355  while (true)
356  {
357  struct timeval tv;
358  fd_set input_mask;
359  fd_set output_mask;
361  int sock = PQcancelSocket(cancelConn);
362 
363  if (pollres == PGRES_POLLING_OK)
364  break;
365 
366  FD_ZERO(&input_mask);
367  FD_ZERO(&output_mask);
368  switch (pollres)
369  {
371  pg_debug("polling for reads\n");
372  FD_SET(sock, &input_mask);
373  break;
375  pg_debug("polling for writes\n");
376  FD_SET(sock, &output_mask);
377  break;
378  default:
379  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
380  }
381 
382  if (sock < 0)
383  pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
384 
385  tv.tv_sec = 3;
386  tv.tv_usec = 0;
387 
388  while (true)
389  {
390  if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
391  {
392  if (errno == EINTR)
393  continue;
394  pg_fatal("select() failed: %m");
395  }
396  break;
397  }
398  }
400  pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
402 
404 
405  fprintf(stderr, "ok\n");
406 }
407 
408 static void
410 {
411  PGresult *res = NULL;
412 
413  fprintf(stderr, "test error cases... ");
414 
415  if (PQisnonblocking(conn))
416  pg_fatal("Expected blocking connection mode");
417 
418  if (PQenterPipelineMode(conn) != 1)
419  pg_fatal("Unable to enter pipeline mode");
420 
422  pg_fatal("Pipeline mode not activated properly");
423 
424  /* PQexec should fail in pipeline mode */
425  res = PQexec(conn, "SELECT 1");
427  pg_fatal("PQexec should fail in pipeline mode but succeeded");
428  if (strcmp(PQerrorMessage(conn),
429  "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
430  pg_fatal("did not get expected error message; got: \"%s\"",
432 
433  /* PQsendQuery should fail in pipeline mode */
434  if (PQsendQuery(conn, "SELECT 1") != 0)
435  pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
436  if (strcmp(PQerrorMessage(conn),
437  "PQsendQuery not allowed in pipeline mode\n") != 0)
438  pg_fatal("did not get expected error message; got: \"%s\"",
440 
441  /* Entering pipeline mode when already in pipeline mode is OK */
442  if (PQenterPipelineMode(conn) != 1)
443  pg_fatal("re-entering pipeline mode should be a no-op but failed");
444 
445  if (PQisBusy(conn) != 0)
446  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
447 
448  /* ok, back to normal command mode */
449  if (PQexitPipelineMode(conn) != 1)
450  pg_fatal("couldn't exit idle empty pipeline mode");
451 
453  pg_fatal("Pipeline mode not terminated properly");
454 
455  /* exiting pipeline mode when not in pipeline mode should be a no-op */
456  if (PQexitPipelineMode(conn) != 1)
457  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
458 
459  /* can now PQexec again */
460  res = PQexec(conn, "SELECT 1");
462  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
464 
465  fprintf(stderr, "ok\n");
466 }
467 
468 static void
470 {
471  PGresult *res = NULL;
472  const char *dummy_params[1] = {"1"};
473  Oid dummy_param_oids[1] = {INT4OID};
474 
475  fprintf(stderr, "multi pipeline... ");
476 
477  /*
478  * Queue up a couple of small pipelines and process each without returning
479  * to command mode first.
480  */
481  if (PQenterPipelineMode(conn) != 1)
482  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
483 
484  /* first pipeline */
485  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
486  dummy_params, NULL, NULL, 0) != 1)
487  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
488 
489  if (PQpipelineSync(conn) != 1)
490  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
491 
492  /* second pipeline */
493  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
494  dummy_params, NULL, NULL, 0) != 1)
495  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
496 
497  /* Skip flushing once. */
498  if (PQsendPipelineSync(conn) != 1)
499  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
500 
501  /* third pipeline */
502  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
503  dummy_params, NULL, NULL, 0) != 1)
504  pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
505 
506  if (PQpipelineSync(conn) != 1)
507  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
508 
509  /* OK, start processing the results */
510 
511  /* first pipeline */
512 
513  res = PQgetResult(conn);
514  if (res == NULL)
515  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
517 
519  pg_fatal("Unexpected result code %s from first pipeline item",
521  PQclear(res);
522  res = NULL;
523 
524  if (PQgetResult(conn) != NULL)
525  pg_fatal("PQgetResult returned something extra after first result");
526 
527  if (PQexitPipelineMode(conn) != 0)
528  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
529 
530  res = PQgetResult(conn);
531  if (res == NULL)
532  pg_fatal("PQgetResult returned null when sync result expected: %s",
534 
536  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
538  PQclear(res);
539 
540  /* second pipeline */
541 
542  res = PQgetResult(conn);
543  if (res == NULL)
544  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
546 
548  pg_fatal("Unexpected result code %s from second pipeline item",
550  PQclear(res);
551  res = NULL;
552 
553  if (PQgetResult(conn) != NULL)
554  pg_fatal("PQgetResult returned something extra after first result");
555 
556  if (PQexitPipelineMode(conn) != 0)
557  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
558 
559  res = PQgetResult(conn);
560  if (res == NULL)
561  pg_fatal("PQgetResult returned null when sync result expected: %s",
563 
565  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
567  PQclear(res);
568 
569  /* third pipeline */
570 
571  res = PQgetResult(conn);
572  if (res == NULL)
573  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
575 
577  pg_fatal("Unexpected result code %s from third pipeline item",
579 
580  res = PQgetResult(conn);
581  if (res != NULL)
582  pg_fatal("Expected null result, got %s",
584 
585  res = PQgetResult(conn);
586  if (res == NULL)
587  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
589 
591  pg_fatal("Unexpected result code %s from second pipeline sync",
593 
594  /* We're still in pipeline mode ... */
596  pg_fatal("Fell out of pipeline mode somehow");
597 
598  /* until we end it, which we can safely do now */
599  if (PQexitPipelineMode(conn) != 1)
600  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
602 
604  pg_fatal("exiting pipeline mode didn't seem to work");
605 
606  fprintf(stderr, "ok\n");
607 }
608 
609 /*
610  * Test behavior when a pipeline dispatches a number of commands that are
611  * not flushed by a sync point.
612  */
613 static void
615 {
616  int numqueries = 10;
617  int results = 0;
618  int sock = PQsocket(conn);
619 
620  fprintf(stderr, "nosync... ");
621 
622  if (sock < 0)
623  pg_fatal("invalid socket");
624 
625  if (PQenterPipelineMode(conn) != 1)
626  pg_fatal("could not enter pipeline mode");
627  for (int i = 0; i < numqueries; i++)
628  {
629  fd_set input_mask;
630  struct timeval tv;
631 
632  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
633  0, NULL, NULL, NULL, NULL, 0) != 1)
634  pg_fatal("error sending select: %s", PQerrorMessage(conn));
635  PQflush(conn);
636 
637  /*
638  * If the server has written anything to us, read (some of) it now.
639  */
640  FD_ZERO(&input_mask);
641  FD_SET(sock, &input_mask);
642  tv.tv_sec = 0;
643  tv.tv_usec = 0;
644  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
645  {
646  fprintf(stderr, "select() failed: %m\n");
647  exit_nicely(conn);
648  }
649  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
650  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
651  }
652 
653  /* tell server to flush its output buffer */
654  if (PQsendFlushRequest(conn) != 1)
655  pg_fatal("failed to send flush request");
656  PQflush(conn);
657 
658  /* Now read all results */
659  for (;;)
660  {
661  PGresult *res;
662 
663  res = PQgetResult(conn);
664 
665  /* NULL results are only expected after TUPLES_OK */
666  if (res == NULL)
667  pg_fatal("got unexpected NULL result after %d results", results);
668 
669  /* We expect exactly one TUPLES_OK result for each query we sent */
671  {
672  PGresult *res2;
673 
674  /* and one NULL result should follow each */
675  res2 = PQgetResult(conn);
676  if (res2 != NULL)
677  pg_fatal("expected NULL, got %s",
678  PQresStatus(PQresultStatus(res2)));
679  PQclear(res);
680  results++;
681 
682  /* if we're done, we're done */
683  if (results == numqueries)
684  break;
685 
686  continue;
687  }
688 
689  /* anything else is unexpected */
690  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
691  }
692 
693  fprintf(stderr, "ok\n");
694 }
695 
696 /*
697  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
698  * still have to get results for each pipeline item, but the item will just be
699  * a PGRES_PIPELINE_ABORTED code.
700  *
701  * This intentionally doesn't use a transaction to wrap the pipeline. You should
702  * usually use an xact, but in this case we want to observe the effects of each
703  * statement.
704  */
705 static void
707 {
708  PGresult *res = NULL;
709  const char *dummy_params[1] = {"1"};
710  Oid dummy_param_oids[1] = {INT4OID};
711  int i;
712  int gotrows;
713  bool goterror;
714 
715  fprintf(stderr, "aborted pipeline... ");
716 
719  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
720 
723  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
724 
725  /*
726  * Queue up a couple of small pipelines and process each without returning
727  * to command mode first. Make sure the second operation in the first
728  * pipeline ERRORs.
729  */
730  if (PQenterPipelineMode(conn) != 1)
731  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
732 
733  dummy_params[0] = "1";
734  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
735  dummy_params, NULL, NULL, 0) != 1)
736  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
737 
738  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
739  1, dummy_param_oids, dummy_params,
740  NULL, NULL, 0) != 1)
741  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
742 
743  dummy_params[0] = "2";
744  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
745  dummy_params, NULL, NULL, 0) != 1)
746  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
747 
748  if (PQpipelineSync(conn) != 1)
749  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
750 
751  dummy_params[0] = "3";
752  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
753  dummy_params, NULL, NULL, 0) != 1)
754  pg_fatal("dispatching second-pipeline insert failed: %s",
756 
757  if (PQpipelineSync(conn) != 1)
758  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
759 
760  /*
761  * OK, start processing the pipeline results.
762  *
763  * We should get a command-ok for the first query, then a fatal error and
764  * a pipeline aborted message for the second insert, a pipeline-end, then
765  * a command-ok and a pipeline-ok for the second pipeline operation.
766  */
767  res = PQgetResult(conn);
768  if (res == NULL)
769  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
771  pg_fatal("Unexpected result status %s: %s",
774  PQclear(res);
775 
776  /* NULL result to signal end-of-results for this command */
777  if ((res = PQgetResult(conn)) != NULL)
778  pg_fatal("Expected null result, got %s",
780 
781  /* Second query caused error, so we expect an error next */
782  res = PQgetResult(conn);
783  if (res == NULL)
784  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
786  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
788  PQclear(res);
789 
790  /* NULL result to signal end-of-results for this command */
791  if ((res = PQgetResult(conn)) != NULL)
792  pg_fatal("Expected null result, got %s",
794 
795  /*
796  * pipeline should now be aborted.
797  *
798  * Note that we could still queue more queries at this point if we wanted;
799  * they'd get added to a new third pipeline since we've already sent a
800  * second. The aborted flag relates only to the pipeline being received.
801  */
803  pg_fatal("pipeline should be flagged as aborted but isn't");
804 
805  /* third query in pipeline, the second insert */
806  res = PQgetResult(conn);
807  if (res == NULL)
808  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
810  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
812  PQclear(res);
813 
814  /* NULL result to signal end-of-results for this command */
815  if ((res = PQgetResult(conn)) != NULL)
816  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
817 
819  pg_fatal("pipeline should be flagged as aborted but isn't");
820 
821  /* Ensure we're still in pipeline */
823  pg_fatal("Fell out of pipeline mode somehow");
824 
825  /*
826  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
827  *
828  * (This is so clients know to start processing results normally again and
829  * can tell the difference between skipped commands and the sync.)
830  */
831  res = PQgetResult(conn);
832  if (res == NULL)
833  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
835  pg_fatal("Unexpected result code from first pipeline sync\n"
836  "Expected PGRES_PIPELINE_SYNC, got %s",
838  PQclear(res);
839 
841  pg_fatal("sync should've cleared the aborted flag but didn't");
842 
843  /* We're still in pipeline mode... */
845  pg_fatal("Fell out of pipeline mode somehow");
846 
847  /* the insert from the second pipeline */
848  res = PQgetResult(conn);
849  if (res == NULL)
850  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
852  pg_fatal("Unexpected result code %s from first item in second pipeline",
854  PQclear(res);
855 
856  /* Read the NULL result at the end of the command */
857  if ((res = PQgetResult(conn)) != NULL)
858  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
859 
860  /* the second pipeline sync */
861  if ((res = PQgetResult(conn)) == NULL)
862  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
864  pg_fatal("Unexpected result code %s from second pipeline sync",
866  PQclear(res);
867 
868  if ((res = PQgetResult(conn)) != NULL)
869  pg_fatal("Expected null result, got %s: %s",
872 
873  /* Try to send two queries in one command */
874  if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
875  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
876  if (PQpipelineSync(conn) != 1)
877  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
878  goterror = false;
879  while ((res = PQgetResult(conn)) != NULL)
880  {
881  switch (PQresultStatus(res))
882  {
883  case PGRES_FATAL_ERROR:
884  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
885  pg_fatal("expected error about multiple commands, got %s",
887  printf("got expected %s", PQerrorMessage(conn));
888  goterror = true;
889  break;
890  default:
891  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
892  break;
893  }
894  }
895  if (!goterror)
896  pg_fatal("did not get cannot-insert-multiple-commands error");
897  res = PQgetResult(conn);
898  if (res == NULL)
899  pg_fatal("got NULL result");
901  pg_fatal("Unexpected result code %s from pipeline sync",
903  fprintf(stderr, "ok\n");
904 
905  /* Test single-row mode with an error partways */
906  if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
907  0, NULL, NULL, NULL, NULL, 0) != 1)
908  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
909  if (PQpipelineSync(conn) != 1)
910  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
912  goterror = false;
913  gotrows = 0;
914  while ((res = PQgetResult(conn)) != NULL)
915  {
916  switch (PQresultStatus(res))
917  {
918  case PGRES_SINGLE_TUPLE:
919  printf("got row: %s\n", PQgetvalue(res, 0, 0));
920  gotrows++;
921  break;
922  case PGRES_FATAL_ERROR:
923  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
924  pg_fatal("expected division-by-zero, got: %s (%s)",
927  printf("got expected division-by-zero\n");
928  goterror = true;
929  break;
930  default:
931  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
932  }
933  PQclear(res);
934  }
935  if (!goterror)
936  pg_fatal("did not get division-by-zero error");
937  if (gotrows != 3)
938  pg_fatal("did not get three rows");
939  /* the third pipeline sync */
940  if ((res = PQgetResult(conn)) == NULL)
941  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
943  pg_fatal("Unexpected result code %s from third pipeline sync",
945  PQclear(res);
946 
947  /* We're still in pipeline mode... */
949  pg_fatal("Fell out of pipeline mode somehow");
950 
951  /* until we end it, which we can safely do now */
952  if (PQexitPipelineMode(conn) != 1)
953  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
955 
957  pg_fatal("exiting pipeline mode didn't seem to work");
958 
959  /*-
960  * Since we fired the pipelines off without a surrounding xact, the results
961  * should be:
962  *
963  * - Implicit xact started by server around 1st pipeline
964  * - First insert applied
965  * - Second statement aborted xact
966  * - Third insert skipped
967  * - Sync rolled back first implicit xact
968  * - Implicit xact created by server around 2nd pipeline
969  * - insert applied from 2nd pipeline
970  * - Sync commits 2nd xact
971  *
972  * So we should only have the value 3 that we inserted.
973  */
974  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
975 
977  pg_fatal("Expected tuples, got %s: %s",
979  if (PQntuples(res) != 1)
980  pg_fatal("expected 1 result, got %d", PQntuples(res));
981  for (i = 0; i < PQntuples(res); i++)
982  {
983  const char *val = PQgetvalue(res, i, 0);
984 
985  if (strcmp(val, "3") != 0)
986  pg_fatal("expected only insert with value 3, got %s", val);
987  }
988 
989  PQclear(res);
990 
991  fprintf(stderr, "ok\n");
992 }
993 
994 /* State machine enum for test_pipelined_insert */
996 {
1005 };
1006 
1007 static void
1009 {
1010  Oid insert_param_oids[2] = {INT4OID, INT8OID};
1011  const char *insert_params[2];
1012  char insert_param_0[MAXINTLEN];
1013  char insert_param_1[MAXINT8LEN];
1014  enum PipelineInsertStep send_step = BI_BEGIN_TX,
1015  recv_step = BI_BEGIN_TX;
1016  int rows_to_send,
1017  rows_to_receive;
1018 
1019  insert_params[0] = insert_param_0;
1020  insert_params[1] = insert_param_1;
1021 
1022  rows_to_send = rows_to_receive = n_rows;
1023 
1024  /*
1025  * Do a pipelined insert into a table created at the start of the pipeline
1026  */
1027  if (PQenterPipelineMode(conn) != 1)
1028  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1029 
1030  while (send_step != BI_PREPARE)
1031  {
1032  const char *sql;
1033 
1034  switch (send_step)
1035  {
1036  case BI_BEGIN_TX:
1037  sql = "BEGIN TRANSACTION";
1038  send_step = BI_DROP_TABLE;
1039  break;
1040 
1041  case BI_DROP_TABLE:
1042  sql = drop_table_sql;
1043  send_step = BI_CREATE_TABLE;
1044  break;
1045 
1046  case BI_CREATE_TABLE:
1047  sql = create_table_sql;
1048  send_step = BI_PREPARE;
1049  break;
1050 
1051  default:
1052  pg_fatal("invalid state");
1053  sql = NULL; /* keep compiler quiet */
1054  }
1055 
1056  pg_debug("sending: %s\n", sql);
1057  if (PQsendQueryParams(conn, sql,
1058  0, NULL, NULL, NULL, NULL, 0) != 1)
1059  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1060  }
1061 
1062  Assert(send_step == BI_PREPARE);
1063  pg_debug("sending: %s\n", insert_sql2);
1064  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1065  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1066  send_step = BI_INSERT_ROWS;
1067 
1068  /*
1069  * Now we start inserting. We'll be sending enough data that we could fill
1070  * our output buffer, so to avoid deadlocking we need to enter nonblocking
1071  * mode and consume input while we send more output. As results of each
1072  * query are processed we should pop them to allow processing of the next
1073  * query. There's no need to finish the pipeline before processing
1074  * results.
1075  */
1076  if (PQsetnonblocking(conn, 1) != 0)
1077  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1078 
1079  while (recv_step != BI_DONE)
1080  {
1081  int sock;
1082  fd_set input_mask;
1083  fd_set output_mask;
1084 
1085  sock = PQsocket(conn);
1086 
1087  if (sock < 0)
1088  break; /* shouldn't happen */
1089 
1090  FD_ZERO(&input_mask);
1091  FD_SET(sock, &input_mask);
1092  FD_ZERO(&output_mask);
1093  FD_SET(sock, &output_mask);
1094 
1095  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1096  {
1097  fprintf(stderr, "select() failed: %m\n");
1098  exit_nicely(conn);
1099  }
1100 
1101  /*
1102  * Process any results, so we keep the server's output buffer free
1103  * flowing and it can continue to process input
1104  */
1105  if (FD_ISSET(sock, &input_mask))
1106  {
1108 
1109  /* Read until we'd block if we tried to read */
1110  while (!PQisBusy(conn) && recv_step < BI_DONE)
1111  {
1112  PGresult *res;
1113  const char *cmdtag = "";
1114  const char *description = "";
1115  int status;
1116 
1117  /*
1118  * Read next result. If no more results from this query,
1119  * advance to the next query
1120  */
1121  res = PQgetResult(conn);
1122  if (res == NULL)
1123  continue;
1124 
1125  status = PGRES_COMMAND_OK;
1126  switch (recv_step)
1127  {
1128  case BI_BEGIN_TX:
1129  cmdtag = "BEGIN";
1130  recv_step++;
1131  break;
1132  case BI_DROP_TABLE:
1133  cmdtag = "DROP TABLE";
1134  recv_step++;
1135  break;
1136  case BI_CREATE_TABLE:
1137  cmdtag = "CREATE TABLE";
1138  recv_step++;
1139  break;
1140  case BI_PREPARE:
1141  cmdtag = "";
1142  description = "PREPARE";
1143  recv_step++;
1144  break;
1145  case BI_INSERT_ROWS:
1146  cmdtag = "INSERT";
1147  rows_to_receive--;
1148  if (rows_to_receive == 0)
1149  recv_step++;
1150  break;
1151  case BI_COMMIT_TX:
1152  cmdtag = "COMMIT";
1153  recv_step++;
1154  break;
1155  case BI_SYNC:
1156  cmdtag = "";
1157  description = "SYNC";
1158  status = PGRES_PIPELINE_SYNC;
1159  recv_step++;
1160  break;
1161  case BI_DONE:
1162  /* unreachable */
1163  pg_fatal("unreachable state");
1164  }
1165 
1166  if (PQresultStatus(res) != status)
1167  pg_fatal("%s reported status %s, expected %s\n"
1168  "Error message: \"%s\"",
1170  PQresStatus(status), PQerrorMessage(conn));
1171 
1172  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1173  pg_fatal("%s expected command tag '%s', got '%s'",
1174  description, cmdtag, PQcmdStatus(res));
1175 
1176  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1177 
1178  PQclear(res);
1179  }
1180  }
1181 
1182  /* Write more rows and/or the end pipeline message, if needed */
1183  if (FD_ISSET(sock, &output_mask))
1184  {
1185  PQflush(conn);
1186 
1187  if (send_step == BI_INSERT_ROWS)
1188  {
1189  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1190  /* use up some buffer space with a wide value */
1191  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1192 
1193  if (PQsendQueryPrepared(conn, "my_insert",
1194  2, insert_params, NULL, NULL, 0) == 1)
1195  {
1196  pg_debug("sent row %d\n", rows_to_send);
1197 
1198  rows_to_send--;
1199  if (rows_to_send == 0)
1200  send_step++;
1201  }
1202  else
1203  {
1204  /*
1205  * in nonblocking mode, so it's OK for an insert to fail
1206  * to send
1207  */
1208  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1209  rows_to_send, PQerrorMessage(conn));
1210  }
1211  }
1212  else if (send_step == BI_COMMIT_TX)
1213  {
1214  if (PQsendQueryParams(conn, "COMMIT",
1215  0, NULL, NULL, NULL, NULL, 0) == 1)
1216  {
1217  pg_debug("sent COMMIT\n");
1218  send_step++;
1219  }
1220  else
1221  {
1222  fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223  PQerrorMessage(conn));
1224  }
1225  }
1226  else if (send_step == BI_SYNC)
1227  {
1228  if (PQpipelineSync(conn) == 1)
1229  {
1230  fprintf(stdout, "pipeline sync sent\n");
1231  send_step++;
1232  }
1233  else
1234  {
1235  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236  PQerrorMessage(conn));
1237  }
1238  }
1239  }
1240  }
1241 
1242  /* We've got the sync message and the pipeline should be done */
1243  if (PQexitPipelineMode(conn) != 1)
1244  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245  PQerrorMessage(conn));
1246 
1247  if (PQsetnonblocking(conn, 0) != 0)
1248  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1249 
1250  fprintf(stderr, "ok\n");
1251 }
1252 
1253 static void
1255 {
1256  PGresult *res = NULL;
1257  Oid param_oids[1] = {INT4OID};
1258  Oid expected_oids[4];
1259  Oid typ;
1260 
1261  fprintf(stderr, "prepared... ");
1262 
1263  if (PQenterPipelineMode(conn) != 1)
1264  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1265  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1266  "interval '1 sec'",
1267  1, param_oids) != 1)
1268  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1269  expected_oids[0] = INT4OID;
1270  expected_oids[1] = TEXTOID;
1271  expected_oids[2] = NUMERICOID;
1272  expected_oids[3] = INTERVALOID;
1273  if (PQsendDescribePrepared(conn, "select_one") != 1)
1274  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1275  if (PQpipelineSync(conn) != 1)
1276  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1277 
1278  res = PQgetResult(conn);
1279  if (res == NULL)
1280  pg_fatal("PQgetResult returned null");
1282  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1283  PQclear(res);
1284  res = PQgetResult(conn);
1285  if (res != NULL)
1286  pg_fatal("expected NULL result");
1287 
1288  res = PQgetResult(conn);
1289  if (res == NULL)
1290  pg_fatal("PQgetResult returned NULL");
1292  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1293  if (PQnfields(res) != lengthof(expected_oids))
1294  pg_fatal("expected %zu columns, got %d",
1295  lengthof(expected_oids), PQnfields(res));
1296  for (int i = 0; i < PQnfields(res); i++)
1297  {
1298  typ = PQftype(res, i);
1299  if (typ != expected_oids[i])
1300  pg_fatal("field %d: expected type %u, got %u",
1301  i, expected_oids[i], typ);
1302  }
1303  PQclear(res);
1304  res = PQgetResult(conn);
1305  if (res != NULL)
1306  pg_fatal("expected NULL result");
1307 
1308  res = PQgetResult(conn);
1310  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1311 
1312  fprintf(stderr, "closing statement..");
1313  if (PQsendClosePrepared(conn, "select_one") != 1)
1314  pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1315  if (PQpipelineSync(conn) != 1)
1316  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1317 
1318  res = PQgetResult(conn);
1319  if (res == NULL)
1320  pg_fatal("expected non-NULL result");
1322  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1323  PQclear(res);
1324  res = PQgetResult(conn);
1325  if (res != NULL)
1326  pg_fatal("expected NULL result");
1327  res = PQgetResult(conn);
1329  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1330 
1331  if (PQexitPipelineMode(conn) != 1)
1332  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1333 
1334  /* Now that it's closed we should get an error when describing */
1335  res = PQdescribePrepared(conn, "select_one");
1337  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1338 
1339  /*
1340  * Also test the blocking close, this should not fail since closing a
1341  * non-existent prepared statement is a no-op
1342  */
1343  res = PQclosePrepared(conn, "select_one");
1345  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1346 
1347  fprintf(stderr, "creating portal... ");
1348  PQexec(conn, "BEGIN");
1349  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1351  if (PQsendDescribePortal(conn, "cursor_one") != 1)
1352  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1353  if (PQpipelineSync(conn) != 1)
1354  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1355  res = PQgetResult(conn);
1356  if (res == NULL)
1357  pg_fatal("PQgetResult returned null");
1359  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1360 
1361  typ = PQftype(res, 0);
1362  if (typ != INT4OID)
1363  pg_fatal("portal: expected type %u, got %u",
1364  INT4OID, typ);
1365  PQclear(res);
1366  res = PQgetResult(conn);
1367  if (res != NULL)
1368  pg_fatal("expected NULL result");
1369  res = PQgetResult(conn);
1371  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1372 
1373  fprintf(stderr, "closing portal... ");
1374  if (PQsendClosePortal(conn, "cursor_one") != 1)
1375  pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1376  if (PQpipelineSync(conn) != 1)
1377  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1378 
1379  res = PQgetResult(conn);
1380  if (res == NULL)
1381  pg_fatal("expected non-NULL result");
1383  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1384  PQclear(res);
1385  res = PQgetResult(conn);
1386  if (res != NULL)
1387  pg_fatal("expected NULL result");
1388  res = PQgetResult(conn);
1390  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1391 
1392  if (PQexitPipelineMode(conn) != 1)
1393  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1394 
1395  /* Now that it's closed we should get an error when describing */
1396  res = PQdescribePortal(conn, "cursor_one");
1398  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1399 
1400  /*
1401  * Also test the blocking close, this should not fail since closing a
1402  * non-existent portal is a no-op
1403  */
1404  res = PQclosePortal(conn, "cursor_one");
1406  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1407 
1408  fprintf(stderr, "ok\n");
1409 }
1410 
1411 /* Notice processor: print notices, and count how many we got */
1412 static void
1413 notice_processor(void *arg, const char *message)
1414 {
1415  int *n_notices = (int *) arg;
1416 
1417  (*n_notices)++;
1418  fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1419 }
1420 
1421 /* Verify behavior in "idle" state */
1422 static void
1424 {
1425  PGresult *res;
1426  int n_notices = 0;
1427 
1428  fprintf(stderr, "\npipeline idle...\n");
1429 
1431 
1432  /* Try to exit pipeline mode in pipeline-idle state */
1433  if (PQenterPipelineMode(conn) != 1)
1434  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1435  if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1436  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1438  res = PQgetResult(conn);
1439  if (res == NULL)
1440  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441  PQerrorMessage(conn));
1443  pg_fatal("unexpected result code %s from first pipeline item",
1445  PQclear(res);
1446  res = PQgetResult(conn);
1447  if (res != NULL)
1448  pg_fatal("did not receive terminating NULL");
1449  if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1450  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1451  if (PQexitPipelineMode(conn) == 1)
1452  pg_fatal("exiting pipeline succeeded when it shouldn't");
1453  if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1454  strlen("cannot exit pipeline mode")) != 0)
1455  pg_fatal("did not get expected error; got: %s",
1456  PQerrorMessage(conn));
1458  res = PQgetResult(conn);
1460  pg_fatal("unexpected result code %s from second pipeline item",
1462  PQclear(res);
1463  res = PQgetResult(conn);
1464  if (res != NULL)
1465  pg_fatal("did not receive terminating NULL");
1466  if (PQexitPipelineMode(conn) != 1)
1467  pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1468 
1469  if (n_notices > 0)
1470  pg_fatal("got %d notice(s)", n_notices);
1471  fprintf(stderr, "ok - 1\n");
1472 
1473  /* Have a WARNING in the middle of a resultset */
1474  if (PQenterPipelineMode(conn) != 1)
1475  pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1476  if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1477  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1479  res = PQgetResult(conn);
1480  if (res == NULL)
1481  pg_fatal("unexpected NULL result received");
1483  pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1484  if (PQexitPipelineMode(conn) != 1)
1485  pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1486  fprintf(stderr, "ok - 2\n");
1487 }
1488 
1489 static void
1491 {
1492  PGresult *res = NULL;
1493  const char *dummy_params[1] = {"1"};
1494  Oid dummy_param_oids[1] = {INT4OID};
1495 
1496  fprintf(stderr, "simple pipeline... ");
1497 
1498  /*
1499  * Enter pipeline mode and dispatch a set of operations, which we'll then
1500  * process the results of as they come in.
1501  *
1502  * For a simple case we should be able to do this without interim
1503  * processing of results since our output buffer will give us enough slush
1504  * to work with and we won't block on sending. So blocking mode is fine.
1505  */
1506  if (PQisnonblocking(conn))
1507  pg_fatal("Expected blocking connection mode");
1508 
1509  if (PQenterPipelineMode(conn) != 1)
1510  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1511 
1512  if (PQsendQueryParams(conn, "SELECT $1",
1513  1, dummy_param_oids, dummy_params,
1514  NULL, NULL, 0) != 1)
1515  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1516 
1517  if (PQexitPipelineMode(conn) != 0)
1518  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1519 
1520  if (PQpipelineSync(conn) != 1)
1521  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1522 
1523  res = PQgetResult(conn);
1524  if (res == NULL)
1525  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1526  PQerrorMessage(conn));
1527 
1529  pg_fatal("Unexpected result code %s from first pipeline item",
1531 
1532  PQclear(res);
1533  res = NULL;
1534 
1535  if (PQgetResult(conn) != NULL)
1536  pg_fatal("PQgetResult returned something extra after first query result.");
1537 
1538  /*
1539  * Even though we've processed the result there's still a sync to come and
1540  * we can't exit pipeline mode yet
1541  */
1542  if (PQexitPipelineMode(conn) != 0)
1543  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1544 
1545  res = PQgetResult(conn);
1546  if (res == NULL)
1547  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1548  PQerrorMessage(conn));
1549 
1551  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1553 
1554  PQclear(res);
1555  res = NULL;
1556 
1557  if (PQgetResult(conn) != NULL)
1558  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1560 
1561  /* We're still in pipeline mode... */
1563  pg_fatal("Fell out of pipeline mode somehow");
1564 
1565  /* ... until we end it, which we can safely do now */
1566  if (PQexitPipelineMode(conn) != 1)
1567  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1568  PQerrorMessage(conn));
1569 
1571  pg_fatal("Exiting pipeline mode didn't seem to work");
1572 
1573  fprintf(stderr, "ok\n");
1574 }
1575 
1576 static void
1578 {
1579  PGresult *res;
1580  int i;
1581  bool pipeline_ended = false;
1582 
1583  if (PQenterPipelineMode(conn) != 1)
1584  pg_fatal("failed to enter pipeline mode: %s",
1585  PQerrorMessage(conn));
1586 
1587  /* One series of three commands, using single-row mode for the first two. */
1588  for (i = 0; i < 3; i++)
1589  {
1590  char *param[1];
1591 
1592  param[0] = psprintf("%d", 44 + i);
1593 
1594  if (PQsendQueryParams(conn,
1595  "SELECT generate_series(42, $1)",
1596  1,
1597  NULL,
1598  (const char **) param,
1599  NULL,
1600  NULL,
1601  0) != 1)
1602  pg_fatal("failed to send query: %s",
1603  PQerrorMessage(conn));
1604  pfree(param[0]);
1605  }
1606  if (PQpipelineSync(conn) != 1)
1607  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1608 
1609  for (i = 0; !pipeline_ended; i++)
1610  {
1611  bool first = true;
1612  bool saw_ending_tuplesok;
1613  bool isSingleTuple = false;
1614 
1615  /* Set single row mode for only first 2 SELECT queries */
1616  if (i < 2)
1617  {
1618  if (PQsetSingleRowMode(conn) != 1)
1619  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1620  }
1621 
1622  /* Consume rows for this query */
1623  saw_ending_tuplesok = false;
1624  while ((res = PQgetResult(conn)) != NULL)
1625  {
1627 
1628  if (est == PGRES_PIPELINE_SYNC)
1629  {
1630  fprintf(stderr, "end of pipeline reached\n");
1631  pipeline_ended = true;
1632  PQclear(res);
1633  if (i != 3)
1634  pg_fatal("Expected three results, got %d", i);
1635  break;
1636  }
1637 
1638  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1639  if (first)
1640  {
1641  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1642  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1643  i, PQresStatus(est));
1644  if (i >= 2 && est != PGRES_TUPLES_OK)
1645  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1646  i, PQresStatus(est));
1647  first = false;
1648  }
1649 
1650  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1651  switch (est)
1652  {
1653  case PGRES_TUPLES_OK:
1654  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1655  saw_ending_tuplesok = true;
1656  if (isSingleTuple)
1657  {
1658  if (PQntuples(res) == 0)
1659  fprintf(stderr, "all tuples received in query %d\n", i);
1660  else
1661  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1662  }
1663  break;
1664 
1665  case PGRES_SINGLE_TUPLE:
1666  isSingleTuple = true;
1667  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1668  break;
1669 
1670  default:
1671  pg_fatal("unexpected");
1672  }
1673  PQclear(res);
1674  }
1675  if (!pipeline_ended && !saw_ending_tuplesok)
1676  pg_fatal("didn't get expected terminating TUPLES_OK");
1677  }
1678 
1679  /*
1680  * Now issue one command, get its results in with single-row mode, then
1681  * issue another command, and get its results in normal mode; make sure
1682  * the single-row mode flag is reset as expected.
1683  */
1684  if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1685  0, NULL, NULL, NULL, NULL, 0) != 1)
1686  pg_fatal("failed to send query: %s",
1687  PQerrorMessage(conn));
1688  if (PQsendFlushRequest(conn) != 1)
1689  pg_fatal("failed to send flush request");
1690  if (PQsetSingleRowMode(conn) != 1)
1691  pg_fatal("PQsetSingleRowMode() failed");
1692  res = PQgetResult(conn);
1693  if (res == NULL)
1694  pg_fatal("unexpected NULL");
1696  pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1698  res = PQgetResult(conn);
1699  if (res == NULL)
1700  pg_fatal("unexpected NULL");
1702  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1704  if (PQgetResult(conn) != NULL)
1705  pg_fatal("expected NULL result");
1706 
1707  if (PQsendQueryParams(conn, "SELECT 1",
1708  0, NULL, NULL, NULL, NULL, 0) != 1)
1709  pg_fatal("failed to send query: %s",
1710  PQerrorMessage(conn));
1711  if (PQsendFlushRequest(conn) != 1)
1712  pg_fatal("failed to send flush request");
1713  res = PQgetResult(conn);
1714  if (res == NULL)
1715  pg_fatal("unexpected NULL");
1717  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1719  if (PQgetResult(conn) != NULL)
1720  pg_fatal("expected NULL result");
1721 
1722  /*
1723  * Try chunked mode as well; make sure that it correctly delivers a
1724  * partial final chunk.
1725  */
1726  if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1727  0, NULL, NULL, NULL, NULL, 0) != 1)
1728  pg_fatal("failed to send query: %s",
1729  PQerrorMessage(conn));
1730  if (PQsendFlushRequest(conn) != 1)
1731  pg_fatal("failed to send flush request");
1732  if (PQsetChunkedRowsMode(conn, 3) != 1)
1733  pg_fatal("PQsetChunkedRowsMode() failed");
1734  res = PQgetResult(conn);
1735  if (res == NULL)
1736  pg_fatal("unexpected NULL");
1738  pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1740  PQerrorMessage(conn));
1741  if (PQntuples(res) != 3)
1742  pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1743  res = PQgetResult(conn);
1744  if (res == NULL)
1745  pg_fatal("unexpected NULL");
1747  pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1749  if (PQntuples(res) != 2)
1750  pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1751  res = PQgetResult(conn);
1752  if (res == NULL)
1753  pg_fatal("unexpected NULL");
1755  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1757  if (PQntuples(res) != 0)
1758  pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1759  if (PQgetResult(conn) != NULL)
1760  pg_fatal("expected NULL result");
1761 
1762  if (PQexitPipelineMode(conn) != 1)
1763  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1764 
1765  fprintf(stderr, "ok\n");
1766 }
1767 
1768 /*
1769  * Simple test to verify that a pipeline is discarded as a whole when there's
1770  * an error, ignoring transaction commands.
1771  */
1772 static void
1774 {
1775  PGresult *res;
1776  bool expect_null;
1777  int num_syncs = 0;
1778 
1779  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1780  "CREATE TABLE pq_pipeline_tst (id int)");
1782  pg_fatal("failed to create test table: %s",
1783  PQerrorMessage(conn));
1784  PQclear(res);
1785 
1786  if (PQenterPipelineMode(conn) != 1)
1787  pg_fatal("failed to enter pipeline mode: %s",
1788  PQerrorMessage(conn));
1789  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1790  pg_fatal("could not send prepare on pipeline: %s",
1791  PQerrorMessage(conn));
1792 
1793  if (PQsendQueryParams(conn,
1794  "BEGIN",
1795  0, NULL, NULL, NULL, NULL, 0) != 1)
1796  pg_fatal("failed to send query: %s",
1797  PQerrorMessage(conn));
1798  if (PQsendQueryParams(conn,
1799  "SELECT 0/0",
1800  0, NULL, NULL, NULL, NULL, 0) != 1)
1801  pg_fatal("failed to send query: %s",
1802  PQerrorMessage(conn));
1803 
1804  /*
1805  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1806  * get out of the pipeline-aborted state first.
1807  */
1808  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1809  pg_fatal("failed to execute prepared: %s",
1810  PQerrorMessage(conn));
1811 
1812  /* This insert fails because we're in pipeline-aborted state */
1813  if (PQsendQueryParams(conn,
1814  "INSERT INTO pq_pipeline_tst VALUES (1)",
1815  0, NULL, NULL, NULL, NULL, 0) != 1)
1816  pg_fatal("failed to send query: %s",
1817  PQerrorMessage(conn));
1818  if (PQpipelineSync(conn) != 1)
1819  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1820  num_syncs++;
1821 
1822  /*
1823  * This insert fails even though the pipeline got a SYNC, because we're in
1824  * an aborted transaction
1825  */
1826  if (PQsendQueryParams(conn,
1827  "INSERT INTO pq_pipeline_tst VALUES (2)",
1828  0, NULL, NULL, NULL, NULL, 0) != 1)
1829  pg_fatal("failed to send query: %s",
1830  PQerrorMessage(conn));
1831  if (PQpipelineSync(conn) != 1)
1832  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1833  num_syncs++;
1834 
1835  /*
1836  * Send ROLLBACK using prepared stmt. This one works because we just did
1837  * PQpipelineSync above.
1838  */
1839  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1840  pg_fatal("failed to execute prepared: %s",
1841  PQerrorMessage(conn));
1842 
1843  /*
1844  * Now that we're out of a transaction and in pipeline-good mode, this
1845  * insert works
1846  */
1847  if (PQsendQueryParams(conn,
1848  "INSERT INTO pq_pipeline_tst VALUES (3)",
1849  0, NULL, NULL, NULL, NULL, 0) != 1)
1850  pg_fatal("failed to send query: %s",
1851  PQerrorMessage(conn));
1852  /* Send two syncs now -- match up to SYNC messages below */
1853  if (PQpipelineSync(conn) != 1)
1854  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1855  num_syncs++;
1856  if (PQpipelineSync(conn) != 1)
1857  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1858  num_syncs++;
1859 
1860  expect_null = false;
1861  for (int i = 0;; i++)
1862  {
1863  ExecStatusType restype;
1864 
1865  res = PQgetResult(conn);
1866  if (res == NULL)
1867  {
1868  printf("%d: got NULL result\n", i);
1869  if (!expect_null)
1870  pg_fatal("did not expect NULL here");
1871  expect_null = false;
1872  continue;
1873  }
1874  restype = PQresultStatus(res);
1875  printf("%d: got status %s", i, PQresStatus(restype));
1876  if (expect_null)
1877  pg_fatal("expected NULL");
1878  if (restype == PGRES_FATAL_ERROR)
1879  printf("; error: %s", PQerrorMessage(conn));
1880  else if (restype == PGRES_PIPELINE_ABORTED)
1881  {
1882  printf(": command didn't run because pipeline aborted\n");
1883  }
1884  else
1885  printf("\n");
1886  PQclear(res);
1887 
1888  if (restype == PGRES_PIPELINE_SYNC)
1889  num_syncs--;
1890  else
1891  expect_null = true;
1892  if (num_syncs <= 0)
1893  break;
1894  }
1895  if (PQgetResult(conn) != NULL)
1896  pg_fatal("returned something extra after all the syncs: %s",
1898 
1899  if (PQexitPipelineMode(conn) != 1)
1900  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1901 
1902  /* We expect to find one tuple containing the value "3" */
1903  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1905  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1906  if (PQntuples(res) != 1)
1907  pg_fatal("did not get 1 tuple");
1908  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1909  pg_fatal("did not get expected tuple");
1910  PQclear(res);
1911 
1912  fprintf(stderr, "ok\n");
1913 }
1914 
1915 /*
1916  * In this test mode we send a stream of queries, with one in the middle
1917  * causing an error. Verify that we can still send some more after the
1918  * error and have libpq work properly.
1919  */
1920 static void
1922 {
1923  int sock = PQsocket(conn);
1924  PGresult *res;
1925  Oid paramTypes[2] = {INT8OID, INT8OID};
1926  const char *paramValues[2];
1927  char paramValue0[MAXINT8LEN];
1928  char paramValue1[MAXINT8LEN];
1929  int ctr = 0;
1930  int numsent = 0;
1931  int results = 0;
1932  bool read_done = false;
1933  bool write_done = false;
1934  bool error_sent = false;
1935  bool got_error = false;
1936  int switched = 0;
1937  int socketful = 0;
1938  fd_set in_fds;
1939  fd_set out_fds;
1940 
1941  fprintf(stderr, "uniqviol ...");
1942 
1943  PQsetnonblocking(conn, 1);
1944 
1945  paramValues[0] = paramValue0;
1946  paramValues[1] = paramValue1;
1947  sprintf(paramValue1, "42");
1948 
1949  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1950  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1952  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1953 
1954  res = PQexec(conn, "begin");
1956  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1957 
1958  res = PQprepare(conn, "insertion",
1959  "insert into ppln_uniqviol values ($1, $2) returning id",
1960  2, paramTypes);
1961  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1962  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1963 
1964  if (PQenterPipelineMode(conn) != 1)
1965  pg_fatal("failed to enter pipeline mode");
1966 
1967  while (!read_done)
1968  {
1969  /*
1970  * Avoid deadlocks by reading everything the server has sent before
1971  * sending anything. (Special precaution is needed here to process
1972  * PQisBusy before testing the socket for read-readiness, because the
1973  * socket does not turn read-ready after "sending" queries in aborted
1974  * pipeline mode.)
1975  */
1976  while (PQisBusy(conn) == 0)
1977  {
1978  bool new_error;
1979 
1980  if (results >= numsent)
1981  {
1982  if (write_done)
1983  read_done = true;
1984  break;
1985  }
1986 
1987  res = PQgetResult(conn);
1988  new_error = process_result(conn, res, results, numsent);
1989  if (new_error && got_error)
1990  pg_fatal("got two errors");
1991  got_error |= new_error;
1992  if (results++ >= numsent - 1)
1993  {
1994  if (write_done)
1995  read_done = true;
1996  break;
1997  }
1998  }
1999 
2000  if (read_done)
2001  break;
2002 
2003  FD_ZERO(&out_fds);
2004  FD_SET(sock, &out_fds);
2005 
2006  FD_ZERO(&in_fds);
2007  FD_SET(sock, &in_fds);
2008 
2009  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2010  {
2011  if (errno == EINTR)
2012  continue;
2013  pg_fatal("select() failed: %m");
2014  }
2015 
2016  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2017  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2018 
2019  /*
2020  * If the socket is writable and we haven't finished sending queries,
2021  * send some.
2022  */
2023  if (!write_done && FD_ISSET(sock, &out_fds))
2024  {
2025  for (;;)
2026  {
2027  int flush;
2028 
2029  /*
2030  * provoke uniqueness violation exactly once after having
2031  * switched to read mode.
2032  */
2033  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2034  {
2035  sprintf(paramValue0, "%d", numsent / 2);
2036  fprintf(stderr, "E");
2037  error_sent = true;
2038  }
2039  else
2040  {
2041  fprintf(stderr, ".");
2042  sprintf(paramValue0, "%d", ctr++);
2043  }
2044 
2045  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2046  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2047  numsent++;
2048 
2049  /* Are we done writing? */
2050  if (socketful != 0 && numsent % socketful == 42 && error_sent)
2051  {
2052  if (PQsendFlushRequest(conn) != 1)
2053  pg_fatal("failed to send flush request");
2054  write_done = true;
2055  fprintf(stderr, "\ndone writing\n");
2056  PQflush(conn);
2057  break;
2058  }
2059 
2060  /* is the outgoing socket full? */
2061  flush = PQflush(conn);
2062  if (flush == -1)
2063  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2064  if (flush == 1)
2065  {
2066  if (socketful == 0)
2067  socketful = numsent;
2068  fprintf(stderr, "\nswitch to reading\n");
2069  switched++;
2070  break;
2071  }
2072  }
2073  }
2074  }
2075 
2076  if (!got_error)
2077  pg_fatal("did not get expected error");
2078 
2079  fprintf(stderr, "ok\n");
2080 }
2081 
2082 /*
2083  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2084  * the expected NULL that should follow it.
2085  *
2086  * Returns true if we read a fatal error message, otherwise false.
2087  */
2088 static bool
2089 process_result(PGconn *conn, PGresult *res, int results, int numsent)
2090 {
2091  PGresult *res2;
2092  bool got_error = false;
2093 
2094  if (res == NULL)
2095  pg_fatal("got unexpected NULL");
2096 
2097  switch (PQresultStatus(res))
2098  {
2099  case PGRES_FATAL_ERROR:
2100  got_error = true;
2101  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2102  PQclear(res);
2103 
2104  res2 = PQgetResult(conn);
2105  if (res2 != NULL)
2106  pg_fatal("expected NULL, got %s",
2107  PQresStatus(PQresultStatus(res2)));
2108  break;
2109 
2110  case PGRES_TUPLES_OK:
2111  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2112  PQclear(res);
2113 
2114  res2 = PQgetResult(conn);
2115  if (res2 != NULL)
2116  pg_fatal("expected NULL, got %s",
2117  PQresStatus(PQresultStatus(res2)));
2118  break;
2119 
2121  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2122  res2 = PQgetResult(conn);
2123  if (res2 != NULL)
2124  pg_fatal("expected NULL, got %s",
2125  PQresStatus(PQresultStatus(res2)));
2126  break;
2127 
2128  default:
2129  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2130  }
2131 
2132  return got_error;
2133 }
2134 
2135 
2136 static void
2137 usage(const char *progname)
2138 {
2139  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2140  fprintf(stderr, "Usage:\n");
2141  fprintf(stderr, " %s [OPTION] tests\n", progname);
2142  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2143  fprintf(stderr, "\nOptions:\n");
2144  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2145  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2146 }
2147 
2148 static void
2150 {
2151  printf("cancel\n");
2152  printf("disallowed_in_pipeline\n");
2153  printf("multi_pipelines\n");
2154  printf("nosync\n");
2155  printf("pipeline_abort\n");
2156  printf("pipeline_idle\n");
2157  printf("pipelined_insert\n");
2158  printf("prepared\n");
2159  printf("simple_pipeline\n");
2160  printf("singlerow\n");
2161  printf("transaction\n");
2162  printf("uniqviol\n");
2163 }
2164 
2165 int
2166 main(int argc, char **argv)
2167 {
2168  const char *conninfo = "";
2169  PGconn *conn;
2170  FILE *trace;
2171  char *testname;
2172  int numrows = 10000;
2173  PGresult *res;
2174  int c;
2175 
2176  while ((c = getopt(argc, argv, "r:t:")) != -1)
2177  {
2178  switch (c)
2179  {
2180  case 'r': /* numrows */
2181  errno = 0;
2182  numrows = strtol(optarg, NULL, 10);
2183  if (errno != 0 || numrows <= 0)
2184  {
2185  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2186  optarg);
2187  exit(1);
2188  }
2189  break;
2190  case 't': /* trace file */
2192  break;
2193  }
2194  }
2195 
2196  if (optind < argc)
2197  {
2198  testname = pg_strdup(argv[optind]);
2199  optind++;
2200  }
2201  else
2202  {
2203  usage(argv[0]);
2204  exit(1);
2205  }
2206 
2207  if (strcmp(testname, "tests") == 0)
2208  {
2209  print_test_list();
2210  exit(0);
2211  }
2212 
2213  if (optind < argc)
2214  {
2215  conninfo = pg_strdup(argv[optind]);
2216  optind++;
2217  }
2218 
2219  /* Make a connection to the database */
2220  conn = PQconnectdb(conninfo);
2221  if (PQstatus(conn) != CONNECTION_OK)
2222  {
2223  fprintf(stderr, "Connection to database failed: %s\n",
2224  PQerrorMessage(conn));
2225  exit_nicely(conn);
2226  }
2227 
2228  res = PQexec(conn, "SET lc_messages TO \"C\"");
2230  pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2231  res = PQexec(conn, "SET debug_parallel_query = off");
2233  pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2234 
2235  /* Set the trace file, if requested */
2236  if (tracefile != NULL)
2237  {
2238  if (strcmp(tracefile, "-") == 0)
2239  trace = stdout;
2240  else
2241  trace = fopen(tracefile, "w");
2242  if (trace == NULL)
2243  pg_fatal("could not open file \"%s\": %m", tracefile);
2244 
2245  /* Make it line-buffered */
2246  setvbuf(trace, NULL, PG_IOLBF, 0);
2247 
2248  PQtrace(conn, trace);
2251  }
2252 
2253  if (strcmp(testname, "cancel") == 0)
2254  test_cancel(conn);
2255  else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2257  else if (strcmp(testname, "multi_pipelines") == 0)
2259  else if (strcmp(testname, "nosync") == 0)
2260  test_nosync(conn);
2261  else if (strcmp(testname, "pipeline_abort") == 0)
2263  else if (strcmp(testname, "pipeline_idle") == 0)
2265  else if (strcmp(testname, "pipelined_insert") == 0)
2266  test_pipelined_insert(conn, numrows);
2267  else if (strcmp(testname, "prepared") == 0)
2269  else if (strcmp(testname, "simple_pipeline") == 0)
2271  else if (strcmp(testname, "singlerow") == 0)
2273  else if (strcmp(testname, "transaction") == 0)
2275  else if (strcmp(testname, "uniqviol") == 0)
2277  else
2278  {
2279  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2280  exit(1);
2281  }
2282 
2283  /* close the connection to the database and cleanup */
2284  PQfinish(conn);
2285  return 0;
2286 }
#define lengthof(array)
Definition: c.h:788
static PGcancel *volatile cancelConn
Definition: cancel.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:349
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:318
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:283
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:171
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:463
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:306
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:334
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:661
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:417
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:690
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6961
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7182
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7119
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4893
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:745
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7216
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7224
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7361
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7208
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3073
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2569
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3042
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2455
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2556
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2539
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3282
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2474
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2491
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3944
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1536
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1965
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3272
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3419
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2504
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2521
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3371
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3983
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
long val
Definition: informix.c:689
static struct @157 value
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:81
ExecStatusType
Definition: libpq-fe.h:118
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:137
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:131
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:133
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:134
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:135
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:466
PostgresPollingStatusType
Definition: libpq-fe.h:109
@ PGRES_POLLING_OK
Definition: libpq-fe.h:113
@ PGRES_POLLING_READING
Definition: libpq-fe.h:111
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:112
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:182
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:184
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:468
static void usage(const char *progname)
static void const char * fmt
#define MAXINT8LEN
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void confirm_query_canceled_impl(int line, PGconn *conn)
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
static void const char fflush(stdout)
va_end(args)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
#define confirm_query_canceled(conn)
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
exit(1)
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
va_start(args, fmt)
void pfree(void *pointer)
Definition: mcxt.c:1521
static AmcheckOptions opts
Definition: pg_amcheck.c:111
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:50
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
PGDLLIMPORT char * optarg
Definition: getopt.c:52
#define PG_IOLBF
Definition: port.h:361
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void pg_usleep(long microsec)
Definition: signal.c:53
PGconn * conn
Definition: streamutil.c:55
Definition: regguts.h:323
const char * description
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513