PostgreSQL Source Code  git master
libpq_pipeline.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  * Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/select.h>
19 #include <sys/time.h>
20 
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
23 #include "libpq-fe.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
26 
27 
28 static void exit_nicely(PGconn *conn);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32  int numsent);
33 
34 const char *const progname = "libpq_pipeline";
35 
36 /* Options and defaults */
37 char *tracefile = NULL; /* path to PQtrace() file */
38 
39 
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45 
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59 
60 static void
62 {
63  PQfinish(conn);
64  exit(1);
65 }
66 
67 /*
68  * Print an error to stderr and terminate the program.
69  */
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 static void
73 pg_fatal_impl(int line, const char *fmt,...)
74 {
75  va_list args;
76 
77 
79 
80  fprintf(stderr, "\n%s:%d: ", progname, line);
82  vfprintf(stderr, fmt, args);
84  Assert(fmt[strlen(fmt) - 1] != '\n');
85  fprintf(stderr, "\n");
86  exit(1);
87 }
88 
89 static void
91 {
92  PGresult *res = NULL;
93 
94  fprintf(stderr, "test error cases... ");
95 
96  if (PQisnonblocking(conn))
97  pg_fatal("Expected blocking connection mode");
98 
99  if (PQenterPipelineMode(conn) != 1)
100  pg_fatal("Unable to enter pipeline mode");
101 
103  pg_fatal("Pipeline mode not activated properly");
104 
105  /* PQexec should fail in pipeline mode */
106  res = PQexec(conn, "SELECT 1");
108  pg_fatal("PQexec should fail in pipeline mode but succeeded");
109  if (strcmp(PQerrorMessage(conn),
110  "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111  pg_fatal("did not get expected error message; got: \"%s\"",
113 
114  /* PQsendQuery should fail in pipeline mode */
115  if (PQsendQuery(conn, "SELECT 1") != 0)
116  pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
117  if (strcmp(PQerrorMessage(conn),
118  "PQsendQuery not allowed in pipeline mode\n") != 0)
119  pg_fatal("did not get expected error message; got: \"%s\"",
121 
122  /* Entering pipeline mode when already in pipeline mode is OK */
123  if (PQenterPipelineMode(conn) != 1)
124  pg_fatal("re-entering pipeline mode should be a no-op but failed");
125 
126  if (PQisBusy(conn) != 0)
127  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
128 
129  /* ok, back to normal command mode */
130  if (PQexitPipelineMode(conn) != 1)
131  pg_fatal("couldn't exit idle empty pipeline mode");
132 
134  pg_fatal("Pipeline mode not terminated properly");
135 
136  /* exiting pipeline mode when not in pipeline mode should be a no-op */
137  if (PQexitPipelineMode(conn) != 1)
138  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
139 
140  /* can now PQexec again */
141  res = PQexec(conn, "SELECT 1");
143  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
145 
146  fprintf(stderr, "ok\n");
147 }
148 
149 static void
151 {
152  PGresult *res = NULL;
153  const char *dummy_params[1] = {"1"};
154  Oid dummy_param_oids[1] = {INT4OID};
155 
156  fprintf(stderr, "multi pipeline... ");
157 
158  /*
159  * Queue up a couple of small pipelines and process each without returning
160  * to command mode first.
161  */
162  if (PQenterPipelineMode(conn) != 1)
163  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164 
165  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
166  dummy_params, NULL, NULL, 0) != 1)
167  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
168 
169  if (PQpipelineSync(conn) != 1)
170  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
171 
172  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
173  dummy_params, NULL, NULL, 0) != 1)
174  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
175 
176  if (PQpipelineSync(conn) != 1)
177  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
178 
179  /* OK, start processing the results */
180  res = PQgetResult(conn);
181  if (res == NULL)
182  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
184 
186  pg_fatal("Unexpected result code %s from first pipeline item",
188  PQclear(res);
189  res = NULL;
190 
191  if (PQgetResult(conn) != NULL)
192  pg_fatal("PQgetResult returned something extra after first result");
193 
194  if (PQexitPipelineMode(conn) != 0)
195  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
196 
197  res = PQgetResult(conn);
198  if (res == NULL)
199  pg_fatal("PQgetResult returned null when sync result expected: %s",
201 
203  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
205  PQclear(res);
206 
207  /* second pipeline */
208 
209  res = PQgetResult(conn);
210  if (res == NULL)
211  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
213 
215  pg_fatal("Unexpected result code %s from second pipeline item",
217 
218  res = PQgetResult(conn);
219  if (res != NULL)
220  pg_fatal("Expected null result, got %s",
222 
223  res = PQgetResult(conn);
224  if (res == NULL)
225  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
227 
229  pg_fatal("Unexpected result code %s from second pipeline sync",
231 
232  /* We're still in pipeline mode ... */
234  pg_fatal("Fell out of pipeline mode somehow");
235 
236  /* until we end it, which we can safely do now */
237  if (PQexitPipelineMode(conn) != 1)
238  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
240 
242  pg_fatal("exiting pipeline mode didn't seem to work");
243 
244  fprintf(stderr, "ok\n");
245 }
246 
247 /*
248  * Test behavior when a pipeline dispatches a number of commands that are
249  * not flushed by a sync point.
250  */
251 static void
253 {
254  int numqueries = 10;
255  int results = 0;
256  int sock = PQsocket(conn);
257 
258  fprintf(stderr, "nosync... ");
259 
260  if (sock < 0)
261  pg_fatal("invalid socket");
262 
263  if (PQenterPipelineMode(conn) != 1)
264  pg_fatal("could not enter pipeline mode");
265  for (int i = 0; i < numqueries; i++)
266  {
267  fd_set input_mask;
268  struct timeval tv;
269 
270  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
271  0, NULL, NULL, NULL, NULL, 0) != 1)
272  pg_fatal("error sending select: %s", PQerrorMessage(conn));
273  PQflush(conn);
274 
275  /*
276  * If the server has written anything to us, read (some of) it now.
277  */
278  FD_ZERO(&input_mask);
279  FD_SET(sock, &input_mask);
280  tv.tv_sec = 0;
281  tv.tv_usec = 0;
282  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
283  {
284  fprintf(stderr, "select() failed: %s\n", strerror(errno));
285  exit_nicely(conn);
286  }
287  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
288  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
289  }
290 
291  /* tell server to flush its output buffer */
292  if (PQsendFlushRequest(conn) != 1)
293  pg_fatal("failed to send flush request");
294  PQflush(conn);
295 
296  /* Now read all results */
297  for (;;)
298  {
299  PGresult *res;
300 
301  res = PQgetResult(conn);
302 
303  /* NULL results are only expected after TUPLES_OK */
304  if (res == NULL)
305  pg_fatal("got unexpected NULL result after %d results", results);
306 
307  /* We expect exactly one TUPLES_OK result for each query we sent */
309  {
310  PGresult *res2;
311 
312  /* and one NULL result should follow each */
313  res2 = PQgetResult(conn);
314  if (res2 != NULL)
315  pg_fatal("expected NULL, got %s",
316  PQresStatus(PQresultStatus(res2)));
317  PQclear(res);
318  results++;
319 
320  /* if we're done, we're done */
321  if (results == numqueries)
322  break;
323 
324  continue;
325  }
326 
327  /* anything else is unexpected */
328  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
329  }
330 
331  fprintf(stderr, "ok\n");
332 }
333 
334 /*
335  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
336  * still have to get results for each pipeline item, but the item will just be
337  * a PGRES_PIPELINE_ABORTED code.
338  *
339  * This intentionally doesn't use a transaction to wrap the pipeline. You should
340  * usually use an xact, but in this case we want to observe the effects of each
341  * statement.
342  */
343 static void
345 {
346  PGresult *res = NULL;
347  const char *dummy_params[1] = {"1"};
348  Oid dummy_param_oids[1] = {INT4OID};
349  int i;
350  int gotrows;
351  bool goterror;
352 
353  fprintf(stderr, "aborted pipeline... ");
354 
357  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
358 
361  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
362 
363  /*
364  * Queue up a couple of small pipelines and process each without returning
365  * to command mode first. Make sure the second operation in the first
366  * pipeline ERRORs.
367  */
368  if (PQenterPipelineMode(conn) != 1)
369  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
370 
371  dummy_params[0] = "1";
372  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
373  dummy_params, NULL, NULL, 0) != 1)
374  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
375 
376  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
377  1, dummy_param_oids, dummy_params,
378  NULL, NULL, 0) != 1)
379  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
380 
381  dummy_params[0] = "2";
382  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
383  dummy_params, NULL, NULL, 0) != 1)
384  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
385 
386  if (PQpipelineSync(conn) != 1)
387  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
388 
389  dummy_params[0] = "3";
390  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
391  dummy_params, NULL, NULL, 0) != 1)
392  pg_fatal("dispatching second-pipeline insert failed: %s",
394 
395  if (PQpipelineSync(conn) != 1)
396  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
397 
398  /*
399  * OK, start processing the pipeline results.
400  *
401  * We should get a command-ok for the first query, then a fatal error and
402  * a pipeline aborted message for the second insert, a pipeline-end, then
403  * a command-ok and a pipeline-ok for the second pipeline operation.
404  */
405  res = PQgetResult(conn);
406  if (res == NULL)
407  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
409  pg_fatal("Unexpected result status %s: %s",
412  PQclear(res);
413 
414  /* NULL result to signal end-of-results for this command */
415  if ((res = PQgetResult(conn)) != NULL)
416  pg_fatal("Expected null result, got %s",
418 
419  /* Second query caused error, so we expect an error next */
420  res = PQgetResult(conn);
421  if (res == NULL)
422  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
424  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
426  PQclear(res);
427 
428  /* NULL result to signal end-of-results for this command */
429  if ((res = PQgetResult(conn)) != NULL)
430  pg_fatal("Expected null result, got %s",
432 
433  /*
434  * pipeline should now be aborted.
435  *
436  * Note that we could still queue more queries at this point if we wanted;
437  * they'd get added to a new third pipeline since we've already sent a
438  * second. The aborted flag relates only to the pipeline being received.
439  */
441  pg_fatal("pipeline should be flagged as aborted but isn't");
442 
443  /* third query in pipeline, the second insert */
444  res = PQgetResult(conn);
445  if (res == NULL)
446  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
448  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
450  PQclear(res);
451 
452  /* NULL result to signal end-of-results for this command */
453  if ((res = PQgetResult(conn)) != NULL)
454  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
455 
457  pg_fatal("pipeline should be flagged as aborted but isn't");
458 
459  /* Ensure we're still in pipeline */
461  pg_fatal("Fell out of pipeline mode somehow");
462 
463  /*
464  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
465  *
466  * (This is so clients know to start processing results normally again and
467  * can tell the difference between skipped commands and the sync.)
468  */
469  res = PQgetResult(conn);
470  if (res == NULL)
471  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
473  pg_fatal("Unexpected result code from first pipeline sync\n"
474  "Expected PGRES_PIPELINE_SYNC, got %s",
476  PQclear(res);
477 
479  pg_fatal("sync should've cleared the aborted flag but didn't");
480 
481  /* We're still in pipeline mode... */
483  pg_fatal("Fell out of pipeline mode somehow");
484 
485  /* the insert from the second pipeline */
486  res = PQgetResult(conn);
487  if (res == NULL)
488  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
490  pg_fatal("Unexpected result code %s from first item in second pipeline",
492  PQclear(res);
493 
494  /* Read the NULL result at the end of the command */
495  if ((res = PQgetResult(conn)) != NULL)
496  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
497 
498  /* the second pipeline sync */
499  if ((res = PQgetResult(conn)) == NULL)
500  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
502  pg_fatal("Unexpected result code %s from second pipeline sync",
504  PQclear(res);
505 
506  if ((res = PQgetResult(conn)) != NULL)
507  pg_fatal("Expected null result, got %s: %s",
510 
511  /* Try to send two queries in one command */
512  if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
513  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
514  if (PQpipelineSync(conn) != 1)
515  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
516  goterror = false;
517  while ((res = PQgetResult(conn)) != NULL)
518  {
519  switch (PQresultStatus(res))
520  {
521  case PGRES_FATAL_ERROR:
522  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
523  pg_fatal("expected error about multiple commands, got %s",
525  printf("got expected %s", PQerrorMessage(conn));
526  goterror = true;
527  break;
528  default:
529  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
530  break;
531  }
532  }
533  if (!goterror)
534  pg_fatal("did not get cannot-insert-multiple-commands error");
535  res = PQgetResult(conn);
536  if (res == NULL)
537  pg_fatal("got NULL result");
539  pg_fatal("Unexpected result code %s from pipeline sync",
541  fprintf(stderr, "ok\n");
542 
543  /* Test single-row mode with an error partways */
544  if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
545  0, NULL, NULL, NULL, NULL, 0) != 1)
546  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
547  if (PQpipelineSync(conn) != 1)
548  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
550  goterror = false;
551  gotrows = 0;
552  while ((res = PQgetResult(conn)) != NULL)
553  {
554  switch (PQresultStatus(res))
555  {
556  case PGRES_SINGLE_TUPLE:
557  printf("got row: %s\n", PQgetvalue(res, 0, 0));
558  gotrows++;
559  break;
560  case PGRES_FATAL_ERROR:
561  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
562  pg_fatal("expected division-by-zero, got: %s (%s)",
565  printf("got expected division-by-zero\n");
566  goterror = true;
567  break;
568  default:
569  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
570  }
571  PQclear(res);
572  }
573  if (!goterror)
574  pg_fatal("did not get division-by-zero error");
575  if (gotrows != 3)
576  pg_fatal("did not get three rows");
577  /* the third pipeline sync */
578  if ((res = PQgetResult(conn)) == NULL)
579  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
581  pg_fatal("Unexpected result code %s from third pipeline sync",
583  PQclear(res);
584 
585  /* We're still in pipeline mode... */
587  pg_fatal("Fell out of pipeline mode somehow");
588 
589  /* until we end it, which we can safely do now */
590  if (PQexitPipelineMode(conn) != 1)
591  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
593 
595  pg_fatal("exiting pipeline mode didn't seem to work");
596 
597  /*-
598  * Since we fired the pipelines off without a surrounding xact, the results
599  * should be:
600  *
601  * - Implicit xact started by server around 1st pipeline
602  * - First insert applied
603  * - Second statement aborted xact
604  * - Third insert skipped
605  * - Sync rolled back first implicit xact
606  * - Implicit xact created by server around 2nd pipeline
607  * - insert applied from 2nd pipeline
608  * - Sync commits 2nd xact
609  *
610  * So we should only have the value 3 that we inserted.
611  */
612  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
613 
615  pg_fatal("Expected tuples, got %s: %s",
617  if (PQntuples(res) != 1)
618  pg_fatal("expected 1 result, got %d", PQntuples(res));
619  for (i = 0; i < PQntuples(res); i++)
620  {
621  const char *val = PQgetvalue(res, i, 0);
622 
623  if (strcmp(val, "3") != 0)
624  pg_fatal("expected only insert with value 3, got %s", val);
625  }
626 
627  PQclear(res);
628 
629  fprintf(stderr, "ok\n");
630 }
631 
632 /* State machine enum for test_pipelined_insert */
634 {
642  BI_DONE
643 };
644 
645 static void
647 {
648  Oid insert_param_oids[2] = {INT4OID, INT8OID};
649  const char *insert_params[2];
650  char insert_param_0[MAXINTLEN];
651  char insert_param_1[MAXINT8LEN];
652  enum PipelineInsertStep send_step = BI_BEGIN_TX,
653  recv_step = BI_BEGIN_TX;
654  int rows_to_send,
655  rows_to_receive;
656 
657  insert_params[0] = insert_param_0;
658  insert_params[1] = insert_param_1;
659 
660  rows_to_send = rows_to_receive = n_rows;
661 
662  /*
663  * Do a pipelined insert into a table created at the start of the pipeline
664  */
665  if (PQenterPipelineMode(conn) != 1)
666  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
667 
668  while (send_step != BI_PREPARE)
669  {
670  const char *sql;
671 
672  switch (send_step)
673  {
674  case BI_BEGIN_TX:
675  sql = "BEGIN TRANSACTION";
676  send_step = BI_DROP_TABLE;
677  break;
678 
679  case BI_DROP_TABLE:
680  sql = drop_table_sql;
681  send_step = BI_CREATE_TABLE;
682  break;
683 
684  case BI_CREATE_TABLE:
685  sql = create_table_sql;
686  send_step = BI_PREPARE;
687  break;
688 
689  default:
690  pg_fatal("invalid state");
691  sql = NULL; /* keep compiler quiet */
692  }
693 
694  pg_debug("sending: %s\n", sql);
695  if (PQsendQueryParams(conn, sql,
696  0, NULL, NULL, NULL, NULL, 0) != 1)
697  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
698  }
699 
700  Assert(send_step == BI_PREPARE);
701  pg_debug("sending: %s\n", insert_sql2);
702  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
703  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
704  send_step = BI_INSERT_ROWS;
705 
706  /*
707  * Now we start inserting. We'll be sending enough data that we could fill
708  * our output buffer, so to avoid deadlocking we need to enter nonblocking
709  * mode and consume input while we send more output. As results of each
710  * query are processed we should pop them to allow processing of the next
711  * query. There's no need to finish the pipeline before processing
712  * results.
713  */
714  if (PQsetnonblocking(conn, 1) != 0)
715  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
716 
717  while (recv_step != BI_DONE)
718  {
719  int sock;
720  fd_set input_mask;
721  fd_set output_mask;
722 
723  sock = PQsocket(conn);
724 
725  if (sock < 0)
726  break; /* shouldn't happen */
727 
728  FD_ZERO(&input_mask);
729  FD_SET(sock, &input_mask);
730  FD_ZERO(&output_mask);
731  FD_SET(sock, &output_mask);
732 
733  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
734  {
735  fprintf(stderr, "select() failed: %s\n", strerror(errno));
736  exit_nicely(conn);
737  }
738 
739  /*
740  * Process any results, so we keep the server's output buffer free
741  * flowing and it can continue to process input
742  */
743  if (FD_ISSET(sock, &input_mask))
744  {
746 
747  /* Read until we'd block if we tried to read */
748  while (!PQisBusy(conn) && recv_step < BI_DONE)
749  {
750  PGresult *res;
751  const char *cmdtag = "";
752  const char *description = "";
753  int status;
754 
755  /*
756  * Read next result. If no more results from this query,
757  * advance to the next query
758  */
759  res = PQgetResult(conn);
760  if (res == NULL)
761  continue;
762 
763  status = PGRES_COMMAND_OK;
764  switch (recv_step)
765  {
766  case BI_BEGIN_TX:
767  cmdtag = "BEGIN";
768  recv_step++;
769  break;
770  case BI_DROP_TABLE:
771  cmdtag = "DROP TABLE";
772  recv_step++;
773  break;
774  case BI_CREATE_TABLE:
775  cmdtag = "CREATE TABLE";
776  recv_step++;
777  break;
778  case BI_PREPARE:
779  cmdtag = "";
780  description = "PREPARE";
781  recv_step++;
782  break;
783  case BI_INSERT_ROWS:
784  cmdtag = "INSERT";
785  rows_to_receive--;
786  if (rows_to_receive == 0)
787  recv_step++;
788  break;
789  case BI_COMMIT_TX:
790  cmdtag = "COMMIT";
791  recv_step++;
792  break;
793  case BI_SYNC:
794  cmdtag = "";
795  description = "SYNC";
796  status = PGRES_PIPELINE_SYNC;
797  recv_step++;
798  break;
799  case BI_DONE:
800  /* unreachable */
801  pg_fatal("unreachable state");
802  }
803 
804  if (PQresultStatus(res) != status)
805  pg_fatal("%s reported status %s, expected %s\n"
806  "Error message: \"%s\"",
808  PQresStatus(status), PQerrorMessage(conn));
809 
810  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
811  pg_fatal("%s expected command tag '%s', got '%s'",
812  description, cmdtag, PQcmdStatus(res));
813 
814  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
815 
816  PQclear(res);
817  }
818  }
819 
820  /* Write more rows and/or the end pipeline message, if needed */
821  if (FD_ISSET(sock, &output_mask))
822  {
823  PQflush(conn);
824 
825  if (send_step == BI_INSERT_ROWS)
826  {
827  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
828  /* use up some buffer space with a wide value */
829  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
830 
831  if (PQsendQueryPrepared(conn, "my_insert",
832  2, insert_params, NULL, NULL, 0) == 1)
833  {
834  pg_debug("sent row %d\n", rows_to_send);
835 
836  rows_to_send--;
837  if (rows_to_send == 0)
838  send_step++;
839  }
840  else
841  {
842  /*
843  * in nonblocking mode, so it's OK for an insert to fail
844  * to send
845  */
846  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
847  rows_to_send, PQerrorMessage(conn));
848  }
849  }
850  else if (send_step == BI_COMMIT_TX)
851  {
852  if (PQsendQueryParams(conn, "COMMIT",
853  0, NULL, NULL, NULL, NULL, 0) == 1)
854  {
855  pg_debug("sent COMMIT\n");
856  send_step++;
857  }
858  else
859  {
860  fprintf(stderr, "WARNING: failed to send commit: %s\n",
862  }
863  }
864  else if (send_step == BI_SYNC)
865  {
866  if (PQpipelineSync(conn) == 1)
867  {
868  fprintf(stdout, "pipeline sync sent\n");
869  send_step++;
870  }
871  else
872  {
873  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
875  }
876  }
877  }
878  }
879 
880  /* We've got the sync message and the pipeline should be done */
881  if (PQexitPipelineMode(conn) != 1)
882  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
884 
885  if (PQsetnonblocking(conn, 0) != 0)
886  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
887 
888  fprintf(stderr, "ok\n");
889 }
890 
891 static void
893 {
894  PGresult *res = NULL;
895  Oid param_oids[1] = {INT4OID};
896  Oid expected_oids[4];
897  Oid typ;
898 
899  fprintf(stderr, "prepared... ");
900 
901  if (PQenterPipelineMode(conn) != 1)
902  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
903  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
904  "interval '1 sec'",
905  1, param_oids) != 1)
906  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
907  expected_oids[0] = INT4OID;
908  expected_oids[1] = TEXTOID;
909  expected_oids[2] = NUMERICOID;
910  expected_oids[3] = INTERVALOID;
911  if (PQsendDescribePrepared(conn, "select_one") != 1)
912  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
913  if (PQpipelineSync(conn) != 1)
914  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
915 
916  res = PQgetResult(conn);
917  if (res == NULL)
918  pg_fatal("PQgetResult returned null");
920  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
921  PQclear(res);
922  res = PQgetResult(conn);
923  if (res != NULL)
924  pg_fatal("expected NULL result");
925 
926  res = PQgetResult(conn);
927  if (res == NULL)
928  pg_fatal("PQgetResult returned NULL");
930  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
931  if (PQnfields(res) != lengthof(expected_oids))
932  pg_fatal("expected %zd columns, got %d",
933  lengthof(expected_oids), PQnfields(res));
934  for (int i = 0; i < PQnfields(res); i++)
935  {
936  typ = PQftype(res, i);
937  if (typ != expected_oids[i])
938  pg_fatal("field %d: expected type %u, got %u",
939  i, expected_oids[i], typ);
940  }
941  PQclear(res);
942  res = PQgetResult(conn);
943  if (res != NULL)
944  pg_fatal("expected NULL result");
945 
946  res = PQgetResult(conn);
948  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
949 
950  fprintf(stderr, "closing statement..");
951  if (PQsendClosePrepared(conn, "select_one") != 1)
952  pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
953  if (PQpipelineSync(conn) != 1)
954  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
955 
956  res = PQgetResult(conn);
957  if (res == NULL)
958  pg_fatal("expected non-NULL result");
960  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
961  PQclear(res);
962  res = PQgetResult(conn);
963  if (res != NULL)
964  pg_fatal("expected NULL result");
965  res = PQgetResult(conn);
967  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
968 
969  if (PQexitPipelineMode(conn) != 1)
970  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
971 
972  /* Now that it's closed we should get an error when describing */
973  res = PQdescribePrepared(conn, "select_one");
975  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
976 
977  /*
978  * Also test the blocking close, this should not fail since closing a
979  * non-existent prepared statement is a no-op
980  */
981  res = PQclosePrepared(conn, "select_one");
983  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
984 
985  fprintf(stderr, "creating portal... ");
986  PQexec(conn, "BEGIN");
987  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
989  if (PQsendDescribePortal(conn, "cursor_one") != 1)
990  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
991  if (PQpipelineSync(conn) != 1)
992  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
993  res = PQgetResult(conn);
994  if (res == NULL)
995  pg_fatal("PQgetResult returned null");
997  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
998 
999  typ = PQftype(res, 0);
1000  if (typ != INT4OID)
1001  pg_fatal("portal: expected type %u, got %u",
1002  INT4OID, typ);
1003  PQclear(res);
1004  res = PQgetResult(conn);
1005  if (res != NULL)
1006  pg_fatal("expected NULL result");
1007  res = PQgetResult(conn);
1009  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1010 
1011  fprintf(stderr, "closing portal... ");
1012  if (PQsendClosePortal(conn, "cursor_one") != 1)
1013  pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1014  if (PQpipelineSync(conn) != 1)
1015  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1016 
1017  res = PQgetResult(conn);
1018  if (res == NULL)
1019  pg_fatal("expected non-NULL result");
1021  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1022  PQclear(res);
1023  res = PQgetResult(conn);
1024  if (res != NULL)
1025  pg_fatal("expected NULL result");
1026  res = PQgetResult(conn);
1028  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1029 
1030  if (PQexitPipelineMode(conn) != 1)
1031  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1032 
1033  /* Now that it's closed we should get an error when describing */
1034  res = PQdescribePortal(conn, "cursor_one");
1036  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1037 
1038  /*
1039  * Also test the blocking close, this should not fail since closing a
1040  * non-existent portal is a no-op
1041  */
1042  res = PQclosePortal(conn, "cursor_one");
1044  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1045 
1046  fprintf(stderr, "ok\n");
1047 }
1048 
1049 /* Notice processor: print notices, and count how many we got */
1050 static void
1051 notice_processor(void *arg, const char *message)
1052 {
1053  int *n_notices = (int *) arg;
1054 
1055  (*n_notices)++;
1056  fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1057 }
1058 
1059 /* Verify behavior in "idle" state */
1060 static void
1062 {
1063  PGresult *res;
1064  int n_notices = 0;
1065 
1066  fprintf(stderr, "\npipeline idle...\n");
1067 
1069 
1070  /* Try to exit pipeline mode in pipeline-idle state */
1071  if (PQenterPipelineMode(conn) != 1)
1072  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1073  if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1074  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1076  res = PQgetResult(conn);
1077  if (res == NULL)
1078  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1079  PQerrorMessage(conn));
1081  pg_fatal("unexpected result code %s from first pipeline item",
1083  PQclear(res);
1084  res = PQgetResult(conn);
1085  if (res != NULL)
1086  pg_fatal("did not receive terminating NULL");
1087  if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1088  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1089  if (PQexitPipelineMode(conn) == 1)
1090  pg_fatal("exiting pipeline succeeded when it shouldn't");
1091  if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1092  strlen("cannot exit pipeline mode")) != 0)
1093  pg_fatal("did not get expected error; got: %s",
1094  PQerrorMessage(conn));
1096  res = PQgetResult(conn);
1098  pg_fatal("unexpected result code %s from second pipeline item",
1100  PQclear(res);
1101  res = PQgetResult(conn);
1102  if (res != NULL)
1103  pg_fatal("did not receive terminating NULL");
1104  if (PQexitPipelineMode(conn) != 1)
1105  pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1106 
1107  if (n_notices > 0)
1108  pg_fatal("got %d notice(s)", n_notices);
1109  fprintf(stderr, "ok - 1\n");
1110 
1111  /* Have a WARNING in the middle of a resultset */
1112  if (PQenterPipelineMode(conn) != 1)
1113  pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1114  if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1115  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1117  res = PQgetResult(conn);
1118  if (res == NULL)
1119  pg_fatal("unexpected NULL result received");
1121  pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1122  if (PQexitPipelineMode(conn) != 1)
1123  pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1124  fprintf(stderr, "ok - 2\n");
1125 }
1126 
1127 static void
1129 {
1130  PGresult *res = NULL;
1131  const char *dummy_params[1] = {"1"};
1132  Oid dummy_param_oids[1] = {INT4OID};
1133 
1134  fprintf(stderr, "simple pipeline... ");
1135 
1136  /*
1137  * Enter pipeline mode and dispatch a set of operations, which we'll then
1138  * process the results of as they come in.
1139  *
1140  * For a simple case we should be able to do this without interim
1141  * processing of results since our output buffer will give us enough slush
1142  * to work with and we won't block on sending. So blocking mode is fine.
1143  */
1144  if (PQisnonblocking(conn))
1145  pg_fatal("Expected blocking connection mode");
1146 
1147  if (PQenterPipelineMode(conn) != 1)
1148  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1149 
1150  if (PQsendQueryParams(conn, "SELECT $1",
1151  1, dummy_param_oids, dummy_params,
1152  NULL, NULL, 0) != 1)
1153  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1154 
1155  if (PQexitPipelineMode(conn) != 0)
1156  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1157 
1158  if (PQpipelineSync(conn) != 1)
1159  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1160 
1161  res = PQgetResult(conn);
1162  if (res == NULL)
1163  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1164  PQerrorMessage(conn));
1165 
1167  pg_fatal("Unexpected result code %s from first pipeline item",
1169 
1170  PQclear(res);
1171  res = NULL;
1172 
1173  if (PQgetResult(conn) != NULL)
1174  pg_fatal("PQgetResult returned something extra after first query result.");
1175 
1176  /*
1177  * Even though we've processed the result there's still a sync to come and
1178  * we can't exit pipeline mode yet
1179  */
1180  if (PQexitPipelineMode(conn) != 0)
1181  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1182 
1183  res = PQgetResult(conn);
1184  if (res == NULL)
1185  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1186  PQerrorMessage(conn));
1187 
1189  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1191 
1192  PQclear(res);
1193  res = NULL;
1194 
1195  if (PQgetResult(conn) != NULL)
1196  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1198 
1199  /* We're still in pipeline mode... */
1201  pg_fatal("Fell out of pipeline mode somehow");
1202 
1203  /* ... until we end it, which we can safely do now */
1204  if (PQexitPipelineMode(conn) != 1)
1205  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1206  PQerrorMessage(conn));
1207 
1209  pg_fatal("Exiting pipeline mode didn't seem to work");
1210 
1211  fprintf(stderr, "ok\n");
1212 }
1213 
1214 static void
1216 {
1217  PGresult *res;
1218  int i;
1219  bool pipeline_ended = false;
1220 
1221  if (PQenterPipelineMode(conn) != 1)
1222  pg_fatal("failed to enter pipeline mode: %s",
1223  PQerrorMessage(conn));
1224 
1225  /* One series of three commands, using single-row mode for the first two. */
1226  for (i = 0; i < 3; i++)
1227  {
1228  char *param[1];
1229 
1230  param[0] = psprintf("%d", 44 + i);
1231 
1232  if (PQsendQueryParams(conn,
1233  "SELECT generate_series(42, $1)",
1234  1,
1235  NULL,
1236  (const char **) param,
1237  NULL,
1238  NULL,
1239  0) != 1)
1240  pg_fatal("failed to send query: %s",
1241  PQerrorMessage(conn));
1242  pfree(param[0]);
1243  }
1244  if (PQpipelineSync(conn) != 1)
1245  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1246 
1247  for (i = 0; !pipeline_ended; i++)
1248  {
1249  bool first = true;
1250  bool saw_ending_tuplesok;
1251  bool isSingleTuple = false;
1252 
1253  /* Set single row mode for only first 2 SELECT queries */
1254  if (i < 2)
1255  {
1256  if (PQsetSingleRowMode(conn) != 1)
1257  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1258  }
1259 
1260  /* Consume rows for this query */
1261  saw_ending_tuplesok = false;
1262  while ((res = PQgetResult(conn)) != NULL)
1263  {
1265 
1266  if (est == PGRES_PIPELINE_SYNC)
1267  {
1268  fprintf(stderr, "end of pipeline reached\n");
1269  pipeline_ended = true;
1270  PQclear(res);
1271  if (i != 3)
1272  pg_fatal("Expected three results, got %d", i);
1273  break;
1274  }
1275 
1276  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1277  if (first)
1278  {
1279  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1280  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1281  i, PQresStatus(est));
1282  if (i >= 2 && est != PGRES_TUPLES_OK)
1283  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1284  i, PQresStatus(est));
1285  first = false;
1286  }
1287 
1288  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1289  switch (est)
1290  {
1291  case PGRES_TUPLES_OK:
1292  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1293  saw_ending_tuplesok = true;
1294  if (isSingleTuple)
1295  {
1296  if (PQntuples(res) == 0)
1297  fprintf(stderr, "all tuples received in query %d\n", i);
1298  else
1299  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1300  }
1301  break;
1302 
1303  case PGRES_SINGLE_TUPLE:
1304  isSingleTuple = true;
1305  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1306  break;
1307 
1308  default:
1309  pg_fatal("unexpected");
1310  }
1311  PQclear(res);
1312  }
1313  if (!pipeline_ended && !saw_ending_tuplesok)
1314  pg_fatal("didn't get expected terminating TUPLES_OK");
1315  }
1316 
1317  /*
1318  * Now issue one command, get its results in with single-row mode, then
1319  * issue another command, and get its results in normal mode; make sure
1320  * the single-row mode flag is reset as expected.
1321  */
1322  if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1323  0, NULL, NULL, NULL, NULL, 0) != 1)
1324  pg_fatal("failed to send query: %s",
1325  PQerrorMessage(conn));
1326  if (PQsendFlushRequest(conn) != 1)
1327  pg_fatal("failed to send flush request");
1328  if (PQsetSingleRowMode(conn) != 1)
1329  pg_fatal("PQsetSingleRowMode() failed");
1330  res = PQgetResult(conn);
1331  if (res == NULL)
1332  pg_fatal("unexpected NULL");
1334  pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1336  res = PQgetResult(conn);
1337  if (res == NULL)
1338  pg_fatal("unexpected NULL");
1340  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1342  if (PQgetResult(conn) != NULL)
1343  pg_fatal("expected NULL result");
1344 
1345  if (PQsendQueryParams(conn, "SELECT 1",
1346  0, NULL, NULL, NULL, NULL, 0) != 1)
1347  pg_fatal("failed to send query: %s",
1348  PQerrorMessage(conn));
1349  if (PQsendFlushRequest(conn) != 1)
1350  pg_fatal("failed to send flush request");
1351  res = PQgetResult(conn);
1352  if (res == NULL)
1353  pg_fatal("unexpected NULL");
1355  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1357  if (PQgetResult(conn) != NULL)
1358  pg_fatal("expected NULL result");
1359 
1360  if (PQexitPipelineMode(conn) != 1)
1361  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1362 
1363  fprintf(stderr, "ok\n");
1364 }
1365 
1366 /*
1367  * Simple test to verify that a pipeline is discarded as a whole when there's
1368  * an error, ignoring transaction commands.
1369  */
1370 static void
1372 {
1373  PGresult *res;
1374  bool expect_null;
1375  int num_syncs = 0;
1376 
1377  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1378  "CREATE TABLE pq_pipeline_tst (id int)");
1380  pg_fatal("failed to create test table: %s",
1381  PQerrorMessage(conn));
1382  PQclear(res);
1383 
1384  if (PQenterPipelineMode(conn) != 1)
1385  pg_fatal("failed to enter pipeline mode: %s",
1386  PQerrorMessage(conn));
1387  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1388  pg_fatal("could not send prepare on pipeline: %s",
1389  PQerrorMessage(conn));
1390 
1391  if (PQsendQueryParams(conn,
1392  "BEGIN",
1393  0, NULL, NULL, NULL, NULL, 0) != 1)
1394  pg_fatal("failed to send query: %s",
1395  PQerrorMessage(conn));
1396  if (PQsendQueryParams(conn,
1397  "SELECT 0/0",
1398  0, NULL, NULL, NULL, NULL, 0) != 1)
1399  pg_fatal("failed to send query: %s",
1400  PQerrorMessage(conn));
1401 
1402  /*
1403  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1404  * get out of the pipeline-aborted state first.
1405  */
1406  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1407  pg_fatal("failed to execute prepared: %s",
1408  PQerrorMessage(conn));
1409 
1410  /* This insert fails because we're in pipeline-aborted state */
1411  if (PQsendQueryParams(conn,
1412  "INSERT INTO pq_pipeline_tst VALUES (1)",
1413  0, NULL, NULL, NULL, NULL, 0) != 1)
1414  pg_fatal("failed to send query: %s",
1415  PQerrorMessage(conn));
1416  if (PQpipelineSync(conn) != 1)
1417  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1418  num_syncs++;
1419 
1420  /*
1421  * This insert fails even though the pipeline got a SYNC, because we're in
1422  * an aborted transaction
1423  */
1424  if (PQsendQueryParams(conn,
1425  "INSERT INTO pq_pipeline_tst VALUES (2)",
1426  0, NULL, NULL, NULL, NULL, 0) != 1)
1427  pg_fatal("failed to send query: %s",
1428  PQerrorMessage(conn));
1429  if (PQpipelineSync(conn) != 1)
1430  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1431  num_syncs++;
1432 
1433  /*
1434  * Send ROLLBACK using prepared stmt. This one works because we just did
1435  * PQpipelineSync above.
1436  */
1437  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1438  pg_fatal("failed to execute prepared: %s",
1439  PQerrorMessage(conn));
1440 
1441  /*
1442  * Now that we're out of a transaction and in pipeline-good mode, this
1443  * insert works
1444  */
1445  if (PQsendQueryParams(conn,
1446  "INSERT INTO pq_pipeline_tst VALUES (3)",
1447  0, NULL, NULL, NULL, NULL, 0) != 1)
1448  pg_fatal("failed to send query: %s",
1449  PQerrorMessage(conn));
1450  /* Send two syncs now -- match up to SYNC messages below */
1451  if (PQpipelineSync(conn) != 1)
1452  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1453  num_syncs++;
1454  if (PQpipelineSync(conn) != 1)
1455  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1456  num_syncs++;
1457 
1458  expect_null = false;
1459  for (int i = 0;; i++)
1460  {
1461  ExecStatusType restype;
1462 
1463  res = PQgetResult(conn);
1464  if (res == NULL)
1465  {
1466  printf("%d: got NULL result\n", i);
1467  if (!expect_null)
1468  pg_fatal("did not expect NULL here");
1469  expect_null = false;
1470  continue;
1471  }
1472  restype = PQresultStatus(res);
1473  printf("%d: got status %s", i, PQresStatus(restype));
1474  if (expect_null)
1475  pg_fatal("expected NULL");
1476  if (restype == PGRES_FATAL_ERROR)
1477  printf("; error: %s", PQerrorMessage(conn));
1478  else if (restype == PGRES_PIPELINE_ABORTED)
1479  {
1480  printf(": command didn't run because pipeline aborted\n");
1481  }
1482  else
1483  printf("\n");
1484  PQclear(res);
1485 
1486  if (restype == PGRES_PIPELINE_SYNC)
1487  num_syncs--;
1488  else
1489  expect_null = true;
1490  if (num_syncs <= 0)
1491  break;
1492  }
1493  if (PQgetResult(conn) != NULL)
1494  pg_fatal("returned something extra after all the syncs: %s",
1496 
1497  if (PQexitPipelineMode(conn) != 1)
1498  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1499 
1500  /* We expect to find one tuple containing the value "3" */
1501  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1503  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1504  if (PQntuples(res) != 1)
1505  pg_fatal("did not get 1 tuple");
1506  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1507  pg_fatal("did not get expected tuple");
1508  PQclear(res);
1509 
1510  fprintf(stderr, "ok\n");
1511 }
1512 
1513 /*
1514  * In this test mode we send a stream of queries, with one in the middle
1515  * causing an error. Verify that we can still send some more after the
1516  * error and have libpq work properly.
1517  */
1518 static void
1520 {
1521  int sock = PQsocket(conn);
1522  PGresult *res;
1523  Oid paramTypes[2] = {INT8OID, INT8OID};
1524  const char *paramValues[2];
1525  char paramValue0[MAXINT8LEN];
1526  char paramValue1[MAXINT8LEN];
1527  int ctr = 0;
1528  int numsent = 0;
1529  int results = 0;
1530  bool read_done = false;
1531  bool write_done = false;
1532  bool error_sent = false;
1533  bool got_error = false;
1534  int switched = 0;
1535  int socketful = 0;
1536  fd_set in_fds;
1537  fd_set out_fds;
1538 
1539  fprintf(stderr, "uniqviol ...");
1540 
1541  PQsetnonblocking(conn, 1);
1542 
1543  paramValues[0] = paramValue0;
1544  paramValues[1] = paramValue1;
1545  sprintf(paramValue1, "42");
1546 
1547  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1548  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1550  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1551 
1552  res = PQexec(conn, "begin");
1554  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1555 
1556  res = PQprepare(conn, "insertion",
1557  "insert into ppln_uniqviol values ($1, $2) returning id",
1558  2, paramTypes);
1559  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1560  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1561 
1562  if (PQenterPipelineMode(conn) != 1)
1563  pg_fatal("failed to enter pipeline mode");
1564 
1565  while (!read_done)
1566  {
1567  /*
1568  * Avoid deadlocks by reading everything the server has sent before
1569  * sending anything. (Special precaution is needed here to process
1570  * PQisBusy before testing the socket for read-readiness, because the
1571  * socket does not turn read-ready after "sending" queries in aborted
1572  * pipeline mode.)
1573  */
1574  while (PQisBusy(conn) == 0)
1575  {
1576  bool new_error;
1577 
1578  if (results >= numsent)
1579  {
1580  if (write_done)
1581  read_done = true;
1582  break;
1583  }
1584 
1585  res = PQgetResult(conn);
1586  new_error = process_result(conn, res, results, numsent);
1587  if (new_error && got_error)
1588  pg_fatal("got two errors");
1589  got_error |= new_error;
1590  if (results++ >= numsent - 1)
1591  {
1592  if (write_done)
1593  read_done = true;
1594  break;
1595  }
1596  }
1597 
1598  if (read_done)
1599  break;
1600 
1601  FD_ZERO(&out_fds);
1602  FD_SET(sock, &out_fds);
1603 
1604  FD_ZERO(&in_fds);
1605  FD_SET(sock, &in_fds);
1606 
1607  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1608  {
1609  if (errno == EINTR)
1610  continue;
1611  pg_fatal("select() failed: %m");
1612  }
1613 
1614  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1615  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1616 
1617  /*
1618  * If the socket is writable and we haven't finished sending queries,
1619  * send some.
1620  */
1621  if (!write_done && FD_ISSET(sock, &out_fds))
1622  {
1623  for (;;)
1624  {
1625  int flush;
1626 
1627  /*
1628  * provoke uniqueness violation exactly once after having
1629  * switched to read mode.
1630  */
1631  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1632  {
1633  sprintf(paramValue0, "%d", numsent / 2);
1634  fprintf(stderr, "E");
1635  error_sent = true;
1636  }
1637  else
1638  {
1639  fprintf(stderr, ".");
1640  sprintf(paramValue0, "%d", ctr++);
1641  }
1642 
1643  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1644  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1645  numsent++;
1646 
1647  /* Are we done writing? */
1648  if (socketful != 0 && numsent % socketful == 42 && error_sent)
1649  {
1650  if (PQsendFlushRequest(conn) != 1)
1651  pg_fatal("failed to send flush request");
1652  write_done = true;
1653  fprintf(stderr, "\ndone writing\n");
1654  PQflush(conn);
1655  break;
1656  }
1657 
1658  /* is the outgoing socket full? */
1659  flush = PQflush(conn);
1660  if (flush == -1)
1661  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1662  if (flush == 1)
1663  {
1664  if (socketful == 0)
1665  socketful = numsent;
1666  fprintf(stderr, "\nswitch to reading\n");
1667  switched++;
1668  break;
1669  }
1670  }
1671  }
1672  }
1673 
1674  if (!got_error)
1675  pg_fatal("did not get expected error");
1676 
1677  fprintf(stderr, "ok\n");
1678 }
1679 
1680 /*
1681  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1682  * the expected NULL that should follow it.
1683  *
1684  * Returns true if we read a fatal error message, otherwise false.
1685  */
1686 static bool
1687 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1688 {
1689  PGresult *res2;
1690  bool got_error = false;
1691 
1692  if (res == NULL)
1693  pg_fatal("got unexpected NULL");
1694 
1695  switch (PQresultStatus(res))
1696  {
1697  case PGRES_FATAL_ERROR:
1698  got_error = true;
1699  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1700  PQclear(res);
1701 
1702  res2 = PQgetResult(conn);
1703  if (res2 != NULL)
1704  pg_fatal("expected NULL, got %s",
1705  PQresStatus(PQresultStatus(res2)));
1706  break;
1707 
1708  case PGRES_TUPLES_OK:
1709  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1710  PQclear(res);
1711 
1712  res2 = PQgetResult(conn);
1713  if (res2 != NULL)
1714  pg_fatal("expected NULL, got %s",
1715  PQresStatus(PQresultStatus(res2)));
1716  break;
1717 
1719  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1720  res2 = PQgetResult(conn);
1721  if (res2 != NULL)
1722  pg_fatal("expected NULL, got %s",
1723  PQresStatus(PQresultStatus(res2)));
1724  break;
1725 
1726  default:
1727  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1728  }
1729 
1730  return got_error;
1731 }
1732 
1733 
1734 static void
1735 usage(const char *progname)
1736 {
1737  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1738  fprintf(stderr, "Usage:\n");
1739  fprintf(stderr, " %s [OPTION] tests\n", progname);
1740  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1741  fprintf(stderr, "\nOptions:\n");
1742  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1743  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1744 }
1745 
1746 static void
1748 {
1749  printf("disallowed_in_pipeline\n");
1750  printf("multi_pipelines\n");
1751  printf("nosync\n");
1752  printf("pipeline_abort\n");
1753  printf("pipeline_idle\n");
1754  printf("pipelined_insert\n");
1755  printf("prepared\n");
1756  printf("simple_pipeline\n");
1757  printf("singlerow\n");
1758  printf("transaction\n");
1759  printf("uniqviol\n");
1760 }
1761 
1762 int
1763 main(int argc, char **argv)
1764 {
1765  const char *conninfo = "";
1766  PGconn *conn;
1767  FILE *trace;
1768  char *testname;
1769  int numrows = 10000;
1770  PGresult *res;
1771  int c;
1772 
1773  while ((c = getopt(argc, argv, "r:t:")) != -1)
1774  {
1775  switch (c)
1776  {
1777  case 'r': /* numrows */
1778  errno = 0;
1779  numrows = strtol(optarg, NULL, 10);
1780  if (errno != 0 || numrows <= 0)
1781  {
1782  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1783  optarg);
1784  exit(1);
1785  }
1786  break;
1787  case 't': /* trace file */
1789  break;
1790  }
1791  }
1792 
1793  if (optind < argc)
1794  {
1795  testname = pg_strdup(argv[optind]);
1796  optind++;
1797  }
1798  else
1799  {
1800  usage(argv[0]);
1801  exit(1);
1802  }
1803 
1804  if (strcmp(testname, "tests") == 0)
1805  {
1806  print_test_list();
1807  exit(0);
1808  }
1809 
1810  if (optind < argc)
1811  {
1812  conninfo = pg_strdup(argv[optind]);
1813  optind++;
1814  }
1815 
1816  /* Make a connection to the database */
1817  conn = PQconnectdb(conninfo);
1818  if (PQstatus(conn) != CONNECTION_OK)
1819  {
1820  fprintf(stderr, "Connection to database failed: %s\n",
1821  PQerrorMessage(conn));
1822  exit_nicely(conn);
1823  }
1824 
1825  res = PQexec(conn, "SET lc_messages TO \"C\"");
1827  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1828  res = PQexec(conn, "SET debug_parallel_query = off");
1830  pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
1831 
1832  /* Set the trace file, if requested */
1833  if (tracefile != NULL)
1834  {
1835  if (strcmp(tracefile, "-") == 0)
1836  trace = stdout;
1837  else
1838  trace = fopen(tracefile, "w");
1839  if (trace == NULL)
1840  pg_fatal("could not open file \"%s\": %m", tracefile);
1841 
1842  /* Make it line-buffered */
1843  setvbuf(trace, NULL, PG_IOLBF, 0);
1844 
1845  PQtrace(conn, trace);
1848  }
1849 
1850  if (strcmp(testname, "disallowed_in_pipeline") == 0)
1852  else if (strcmp(testname, "multi_pipelines") == 0)
1854  else if (strcmp(testname, "nosync") == 0)
1855  test_nosync(conn);
1856  else if (strcmp(testname, "pipeline_abort") == 0)
1858  else if (strcmp(testname, "pipeline_idle") == 0)
1860  else if (strcmp(testname, "pipelined_insert") == 0)
1861  test_pipelined_insert(conn, numrows);
1862  else if (strcmp(testname, "prepared") == 0)
1864  else if (strcmp(testname, "simple_pipeline") == 0)
1866  else if (strcmp(testname, "singlerow") == 0)
1868  else if (strcmp(testname, "transaction") == 0)
1870  else if (strcmp(testname, "uniqviol") == 0)
1872  else
1873  {
1874  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1875  exit(1);
1876  }
1877 
1878  /* close the connection to the database and cleanup */
1879  PQfinish(conn);
1880  return 0;
1881 }
#define lengthof(array)
Definition: c.h:777
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7248
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7195
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4602
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:731
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7290
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7427
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7274
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1498
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1929
int PQflush(PGconn *conn)
Definition: fe-exec.c:3914
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2272
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3633
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3039
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2535
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3008
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3325
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2421
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2522
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3341
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2505
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3395
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2440
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2457
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2228
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1957
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3790
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3666
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3858
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1542
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1422
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3226
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3333
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2470
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2487
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2004
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1639
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3293
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3380
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3897
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3403
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2035
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
long val
Definition: informix.c:664
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:60
ExecStatusType
Definition: libpq-fe.h:95
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:415
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:158
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:160
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:417
static void usage(const char *progname)
static void const char * fmt
#define MAXINT8LEN
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void const char fflush(stdout)
va_end(args)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
char * tracefile
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
exit(1)
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
va_start(args, fmt)
void pfree(void *pointer)
Definition: mcxt.c:1456
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:50
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
PGDLLIMPORT char * optarg
Definition: getopt.c:52
#define PG_IOLBF
Definition: port.h:361
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
PGconn * conn
Definition: streamutil.c:54
const char * description
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495