PostgreSQL Source Code  git master
libpq_pipeline.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  * Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/select.h>
19 #include <sys/time.h>
20 
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
23 #include "libpq-fe.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
26 
27 
28 static void exit_nicely(PGconn *conn);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32  int numsent);
33 
34 const char *const progname = "libpq_pipeline";
35 
36 /* Options and defaults */
37 char *tracefile = NULL; /* path to PQtrace() file */
38 
39 
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45 
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59 
60 static void
62 {
63  PQfinish(conn);
64  exit(1);
65 }
66 
67 /*
68  * Print an error to stderr and terminate the program.
69  */
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 static void
73 pg_fatal_impl(int line, const char *fmt,...)
74 {
75  va_list args;
76 
77 
79 
80  fprintf(stderr, "\n%s:%d: ", progname, line);
82  vfprintf(stderr, fmt, args);
84  Assert(fmt[strlen(fmt) - 1] != '\n');
85  fprintf(stderr, "\n");
86  exit(1);
87 }
88 
89 static void
91 {
92  PGresult *res = NULL;
93 
94  fprintf(stderr, "test error cases... ");
95 
96  if (PQisnonblocking(conn))
97  pg_fatal("Expected blocking connection mode");
98 
99  if (PQenterPipelineMode(conn) != 1)
100  pg_fatal("Unable to enter pipeline mode");
101 
103  pg_fatal("Pipeline mode not activated properly");
104 
105  /* PQexec should fail in pipeline mode */
106  res = PQexec(conn, "SELECT 1");
108  pg_fatal("PQexec should fail in pipeline mode but succeeded");
109  if (strcmp(PQerrorMessage(conn),
110  "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111  pg_fatal("did not get expected error message; got: \"%s\"",
113 
114  /* PQsendQuery should fail in pipeline mode */
115  if (PQsendQuery(conn, "SELECT 1") != 0)
116  pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
117  if (strcmp(PQerrorMessage(conn),
118  "PQsendQuery not allowed in pipeline mode\n") != 0)
119  pg_fatal("did not get expected error message; got: \"%s\"",
121 
122  /* Entering pipeline mode when already in pipeline mode is OK */
123  if (PQenterPipelineMode(conn) != 1)
124  pg_fatal("re-entering pipeline mode should be a no-op but failed");
125 
126  if (PQisBusy(conn) != 0)
127  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
128 
129  /* ok, back to normal command mode */
130  if (PQexitPipelineMode(conn) != 1)
131  pg_fatal("couldn't exit idle empty pipeline mode");
132 
134  pg_fatal("Pipeline mode not terminated properly");
135 
136  /* exiting pipeline mode when not in pipeline mode should be a no-op */
137  if (PQexitPipelineMode(conn) != 1)
138  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
139 
140  /* can now PQexec again */
141  res = PQexec(conn, "SELECT 1");
143  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
145 
146  fprintf(stderr, "ok\n");
147 }
148 
149 static void
151 {
152  PGresult *res = NULL;
153  const char *dummy_params[1] = {"1"};
154  Oid dummy_param_oids[1] = {INT4OID};
155 
156  fprintf(stderr, "multi pipeline... ");
157 
158  /*
159  * Queue up a couple of small pipelines and process each without returning
160  * to command mode first.
161  */
162  if (PQenterPipelineMode(conn) != 1)
163  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164 
165  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
166  dummy_params, NULL, NULL, 0) != 1)
167  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
168 
169  if (PQpipelineSync(conn) != 1)
170  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
171 
172  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
173  dummy_params, NULL, NULL, 0) != 1)
174  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
175 
176  if (PQpipelineSync(conn) != 1)
177  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
178 
179  /* OK, start processing the results */
180  res = PQgetResult(conn);
181  if (res == NULL)
182  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
184 
186  pg_fatal("Unexpected result code %s from first pipeline item",
188  PQclear(res);
189  res = NULL;
190 
191  if (PQgetResult(conn) != NULL)
192  pg_fatal("PQgetResult returned something extra after first result");
193 
194  if (PQexitPipelineMode(conn) != 0)
195  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
196 
197  res = PQgetResult(conn);
198  if (res == NULL)
199  pg_fatal("PQgetResult returned null when sync result expected: %s",
201 
203  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
205  PQclear(res);
206 
207  /* second pipeline */
208 
209  res = PQgetResult(conn);
210  if (res == NULL)
211  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
213 
215  pg_fatal("Unexpected result code %s from second pipeline item",
217 
218  res = PQgetResult(conn);
219  if (res != NULL)
220  pg_fatal("Expected null result, got %s",
222 
223  res = PQgetResult(conn);
224  if (res == NULL)
225  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
227 
229  pg_fatal("Unexpected result code %s from second pipeline sync",
231 
232  /* We're still in pipeline mode ... */
234  pg_fatal("Fell out of pipeline mode somehow");
235 
236  /* until we end it, which we can safely do now */
237  if (PQexitPipelineMode(conn) != 1)
238  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
240 
242  pg_fatal("exiting pipeline mode didn't seem to work");
243 
244  fprintf(stderr, "ok\n");
245 }
246 
247 /*
248  * Test behavior when a pipeline dispatches a number of commands that are
249  * not flushed by a sync point.
250  */
251 static void
253 {
254  int numqueries = 10;
255  int results = 0;
256  int sock = PQsocket(conn);
257 
258  fprintf(stderr, "nosync... ");
259 
260  if (sock < 0)
261  pg_fatal("invalid socket");
262 
263  if (PQenterPipelineMode(conn) != 1)
264  pg_fatal("could not enter pipeline mode");
265  for (int i = 0; i < numqueries; i++)
266  {
267  fd_set input_mask;
268  struct timeval tv;
269 
270  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
271  0, NULL, NULL, NULL, NULL, 0) != 1)
272  pg_fatal("error sending select: %s", PQerrorMessage(conn));
273  PQflush(conn);
274 
275  /*
276  * If the server has written anything to us, read (some of) it now.
277  */
278  FD_ZERO(&input_mask);
279  FD_SET(sock, &input_mask);
280  tv.tv_sec = 0;
281  tv.tv_usec = 0;
282  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
283  {
284  fprintf(stderr, "select() failed: %s\n", strerror(errno));
285  exit_nicely(conn);
286  }
287  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
288  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
289  }
290 
291  /* tell server to flush its output buffer */
292  if (PQsendFlushRequest(conn) != 1)
293  pg_fatal("failed to send flush request");
294  PQflush(conn);
295 
296  /* Now read all results */
297  for (;;)
298  {
299  PGresult *res;
300 
301  res = PQgetResult(conn);
302 
303  /* NULL results are only expected after TUPLES_OK */
304  if (res == NULL)
305  pg_fatal("got unexpected NULL result after %d results", results);
306 
307  /* We expect exactly one TUPLES_OK result for each query we sent */
309  {
310  PGresult *res2;
311 
312  /* and one NULL result should follow each */
313  res2 = PQgetResult(conn);
314  if (res2 != NULL)
315  pg_fatal("expected NULL, got %s",
316  PQresStatus(PQresultStatus(res2)));
317  PQclear(res);
318  results++;
319 
320  /* if we're done, we're done */
321  if (results == numqueries)
322  break;
323 
324  continue;
325  }
326 
327  /* anything else is unexpected */
328  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
329  }
330 
331  fprintf(stderr, "ok\n");
332 }
333 
334 /*
335  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
336  * still have to get results for each pipeline item, but the item will just be
337  * a PGRES_PIPELINE_ABORTED code.
338  *
339  * This intentionally doesn't use a transaction to wrap the pipeline. You should
340  * usually use an xact, but in this case we want to observe the effects of each
341  * statement.
342  */
343 static void
345 {
346  PGresult *res = NULL;
347  const char *dummy_params[1] = {"1"};
348  Oid dummy_param_oids[1] = {INT4OID};
349  int i;
350  int gotrows;
351  bool goterror;
352 
353  fprintf(stderr, "aborted pipeline... ");
354 
357  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
358 
361  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
362 
363  /*
364  * Queue up a couple of small pipelines and process each without returning
365  * to command mode first. Make sure the second operation in the first
366  * pipeline ERRORs.
367  */
368  if (PQenterPipelineMode(conn) != 1)
369  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
370 
371  dummy_params[0] = "1";
372  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
373  dummy_params, NULL, NULL, 0) != 1)
374  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
375 
376  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
377  1, dummy_param_oids, dummy_params,
378  NULL, NULL, 0) != 1)
379  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
380 
381  dummy_params[0] = "2";
382  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
383  dummy_params, NULL, NULL, 0) != 1)
384  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
385 
386  if (PQpipelineSync(conn) != 1)
387  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
388 
389  dummy_params[0] = "3";
390  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
391  dummy_params, NULL, NULL, 0) != 1)
392  pg_fatal("dispatching second-pipeline insert failed: %s",
394 
395  if (PQpipelineSync(conn) != 1)
396  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
397 
398  /*
399  * OK, start processing the pipeline results.
400  *
401  * We should get a command-ok for the first query, then a fatal error and
402  * a pipeline aborted message for the second insert, a pipeline-end, then
403  * a command-ok and a pipeline-ok for the second pipeline operation.
404  */
405  res = PQgetResult(conn);
406  if (res == NULL)
407  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
409  pg_fatal("Unexpected result status %s: %s",
412  PQclear(res);
413 
414  /* NULL result to signal end-of-results for this command */
415  if ((res = PQgetResult(conn)) != NULL)
416  pg_fatal("Expected null result, got %s",
418 
419  /* Second query caused error, so we expect an error next */
420  res = PQgetResult(conn);
421  if (res == NULL)
422  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
424  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
426  PQclear(res);
427 
428  /* NULL result to signal end-of-results for this command */
429  if ((res = PQgetResult(conn)) != NULL)
430  pg_fatal("Expected null result, got %s",
432 
433  /*
434  * pipeline should now be aborted.
435  *
436  * Note that we could still queue more queries at this point if we wanted;
437  * they'd get added to a new third pipeline since we've already sent a
438  * second. The aborted flag relates only to the pipeline being received.
439  */
441  pg_fatal("pipeline should be flagged as aborted but isn't");
442 
443  /* third query in pipeline, the second insert */
444  res = PQgetResult(conn);
445  if (res == NULL)
446  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
448  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
450  PQclear(res);
451 
452  /* NULL result to signal end-of-results for this command */
453  if ((res = PQgetResult(conn)) != NULL)
454  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
455 
457  pg_fatal("pipeline should be flagged as aborted but isn't");
458 
459  /* Ensure we're still in pipeline */
461  pg_fatal("Fell out of pipeline mode somehow");
462 
463  /*
464  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
465  *
466  * (This is so clients know to start processing results normally again and
467  * can tell the difference between skipped commands and the sync.)
468  */
469  res = PQgetResult(conn);
470  if (res == NULL)
471  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
473  pg_fatal("Unexpected result code from first pipeline sync\n"
474  "Expected PGRES_PIPELINE_SYNC, got %s",
476  PQclear(res);
477 
479  pg_fatal("sync should've cleared the aborted flag but didn't");
480 
481  /* We're still in pipeline mode... */
483  pg_fatal("Fell out of pipeline mode somehow");
484 
485  /* the insert from the second pipeline */
486  res = PQgetResult(conn);
487  if (res == NULL)
488  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
490  pg_fatal("Unexpected result code %s from first item in second pipeline",
492  PQclear(res);
493 
494  /* Read the NULL result at the end of the command */
495  if ((res = PQgetResult(conn)) != NULL)
496  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
497 
498  /* the second pipeline sync */
499  if ((res = PQgetResult(conn)) == NULL)
500  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
502  pg_fatal("Unexpected result code %s from second pipeline sync",
504  PQclear(res);
505 
506  if ((res = PQgetResult(conn)) != NULL)
507  pg_fatal("Expected null result, got %s: %s",
510 
511  /* Try to send two queries in one command */
512  if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
513  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
514  if (PQpipelineSync(conn) != 1)
515  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
516  goterror = false;
517  while ((res = PQgetResult(conn)) != NULL)
518  {
519  switch (PQresultStatus(res))
520  {
521  case PGRES_FATAL_ERROR:
522  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
523  pg_fatal("expected error about multiple commands, got %s",
525  printf("got expected %s", PQerrorMessage(conn));
526  goterror = true;
527  break;
528  default:
529  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
530  break;
531  }
532  }
533  if (!goterror)
534  pg_fatal("did not get cannot-insert-multiple-commands error");
535  res = PQgetResult(conn);
536  if (res == NULL)
537  pg_fatal("got NULL result");
539  pg_fatal("Unexpected result code %s from pipeline sync",
541  fprintf(stderr, "ok\n");
542 
543  /* Test single-row mode with an error partways */
544  if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
545  0, NULL, NULL, NULL, NULL, 0) != 1)
546  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
547  if (PQpipelineSync(conn) != 1)
548  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
550  goterror = false;
551  gotrows = 0;
552  while ((res = PQgetResult(conn)) != NULL)
553  {
554  switch (PQresultStatus(res))
555  {
556  case PGRES_SINGLE_TUPLE:
557  printf("got row: %s\n", PQgetvalue(res, 0, 0));
558  gotrows++;
559  break;
560  case PGRES_FATAL_ERROR:
561  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
562  pg_fatal("expected division-by-zero, got: %s (%s)",
565  printf("got expected division-by-zero\n");
566  goterror = true;
567  break;
568  default:
569  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
570  }
571  PQclear(res);
572  }
573  if (!goterror)
574  pg_fatal("did not get division-by-zero error");
575  if (gotrows != 3)
576  pg_fatal("did not get three rows");
577  /* the third pipeline sync */
578  if ((res = PQgetResult(conn)) == NULL)
579  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
581  pg_fatal("Unexpected result code %s from third pipeline sync",
583  PQclear(res);
584 
585  /* We're still in pipeline mode... */
587  pg_fatal("Fell out of pipeline mode somehow");
588 
589  /* until we end it, which we can safely do now */
590  if (PQexitPipelineMode(conn) != 1)
591  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
593 
595  pg_fatal("exiting pipeline mode didn't seem to work");
596 
597  /*-
598  * Since we fired the pipelines off without a surrounding xact, the results
599  * should be:
600  *
601  * - Implicit xact started by server around 1st pipeline
602  * - First insert applied
603  * - Second statement aborted xact
604  * - Third insert skipped
605  * - Sync rolled back first implicit xact
606  * - Implicit xact created by server around 2nd pipeline
607  * - insert applied from 2nd pipeline
608  * - Sync commits 2nd xact
609  *
610  * So we should only have the value 3 that we inserted.
611  */
612  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
613 
615  pg_fatal("Expected tuples, got %s: %s",
617  if (PQntuples(res) != 1)
618  pg_fatal("expected 1 result, got %d", PQntuples(res));
619  for (i = 0; i < PQntuples(res); i++)
620  {
621  const char *val = PQgetvalue(res, i, 0);
622 
623  if (strcmp(val, "3") != 0)
624  pg_fatal("expected only insert with value 3, got %s", val);
625  }
626 
627  PQclear(res);
628 
629  fprintf(stderr, "ok\n");
630 }
631 
632 /* State machine enum for test_pipelined_insert */
634 {
642  BI_DONE
643 };
644 
645 static void
647 {
648  Oid insert_param_oids[2] = {INT4OID, INT8OID};
649  const char *insert_params[2];
650  char insert_param_0[MAXINTLEN];
651  char insert_param_1[MAXINT8LEN];
652  enum PipelineInsertStep send_step = BI_BEGIN_TX,
653  recv_step = BI_BEGIN_TX;
654  int rows_to_send,
655  rows_to_receive;
656 
657  insert_params[0] = insert_param_0;
658  insert_params[1] = insert_param_1;
659 
660  rows_to_send = rows_to_receive = n_rows;
661 
662  /*
663  * Do a pipelined insert into a table created at the start of the pipeline
664  */
665  if (PQenterPipelineMode(conn) != 1)
666  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
667 
668  while (send_step != BI_PREPARE)
669  {
670  const char *sql;
671 
672  switch (send_step)
673  {
674  case BI_BEGIN_TX:
675  sql = "BEGIN TRANSACTION";
676  send_step = BI_DROP_TABLE;
677  break;
678 
679  case BI_DROP_TABLE:
680  sql = drop_table_sql;
681  send_step = BI_CREATE_TABLE;
682  break;
683 
684  case BI_CREATE_TABLE:
685  sql = create_table_sql;
686  send_step = BI_PREPARE;
687  break;
688 
689  default:
690  pg_fatal("invalid state");
691  sql = NULL; /* keep compiler quiet */
692  }
693 
694  pg_debug("sending: %s\n", sql);
695  if (PQsendQueryParams(conn, sql,
696  0, NULL, NULL, NULL, NULL, 0) != 1)
697  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
698  }
699 
700  Assert(send_step == BI_PREPARE);
701  pg_debug("sending: %s\n", insert_sql2);
702  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
703  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
704  send_step = BI_INSERT_ROWS;
705 
706  /*
707  * Now we start inserting. We'll be sending enough data that we could fill
708  * our output buffer, so to avoid deadlocking we need to enter nonblocking
709  * mode and consume input while we send more output. As results of each
710  * query are processed we should pop them to allow processing of the next
711  * query. There's no need to finish the pipeline before processing
712  * results.
713  */
714  if (PQsetnonblocking(conn, 1) != 0)
715  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
716 
717  while (recv_step != BI_DONE)
718  {
719  int sock;
720  fd_set input_mask;
721  fd_set output_mask;
722 
723  sock = PQsocket(conn);
724 
725  if (sock < 0)
726  break; /* shouldn't happen */
727 
728  FD_ZERO(&input_mask);
729  FD_SET(sock, &input_mask);
730  FD_ZERO(&output_mask);
731  FD_SET(sock, &output_mask);
732 
733  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
734  {
735  fprintf(stderr, "select() failed: %s\n", strerror(errno));
736  exit_nicely(conn);
737  }
738 
739  /*
740  * Process any results, so we keep the server's output buffer free
741  * flowing and it can continue to process input
742  */
743  if (FD_ISSET(sock, &input_mask))
744  {
746 
747  /* Read until we'd block if we tried to read */
748  while (!PQisBusy(conn) && recv_step < BI_DONE)
749  {
750  PGresult *res;
751  const char *cmdtag = "";
752  const char *description = "";
753  int status;
754 
755  /*
756  * Read next result. If no more results from this query,
757  * advance to the next query
758  */
759  res = PQgetResult(conn);
760  if (res == NULL)
761  continue;
762 
763  status = PGRES_COMMAND_OK;
764  switch (recv_step)
765  {
766  case BI_BEGIN_TX:
767  cmdtag = "BEGIN";
768  recv_step++;
769  break;
770  case BI_DROP_TABLE:
771  cmdtag = "DROP TABLE";
772  recv_step++;
773  break;
774  case BI_CREATE_TABLE:
775  cmdtag = "CREATE TABLE";
776  recv_step++;
777  break;
778  case BI_PREPARE:
779  cmdtag = "";
780  description = "PREPARE";
781  recv_step++;
782  break;
783  case BI_INSERT_ROWS:
784  cmdtag = "INSERT";
785  rows_to_receive--;
786  if (rows_to_receive == 0)
787  recv_step++;
788  break;
789  case BI_COMMIT_TX:
790  cmdtag = "COMMIT";
791  recv_step++;
792  break;
793  case BI_SYNC:
794  cmdtag = "";
795  description = "SYNC";
796  status = PGRES_PIPELINE_SYNC;
797  recv_step++;
798  break;
799  case BI_DONE:
800  /* unreachable */
801  pg_fatal("unreachable state");
802  }
803 
804  if (PQresultStatus(res) != status)
805  pg_fatal("%s reported status %s, expected %s\n"
806  "Error message: \"%s\"",
807  description, PQresStatus(PQresultStatus(res)),
808  PQresStatus(status), PQerrorMessage(conn));
809 
810  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
811  pg_fatal("%s expected command tag '%s', got '%s'",
812  description, cmdtag, PQcmdStatus(res));
813 
814  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
815 
816  PQclear(res);
817  }
818  }
819 
820  /* Write more rows and/or the end pipeline message, if needed */
821  if (FD_ISSET(sock, &output_mask))
822  {
823  PQflush(conn);
824 
825  if (send_step == BI_INSERT_ROWS)
826  {
827  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
828  /* use up some buffer space with a wide value */
829  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
830 
831  if (PQsendQueryPrepared(conn, "my_insert",
832  2, insert_params, NULL, NULL, 0) == 1)
833  {
834  pg_debug("sent row %d\n", rows_to_send);
835 
836  rows_to_send--;
837  if (rows_to_send == 0)
838  send_step++;
839  }
840  else
841  {
842  /*
843  * in nonblocking mode, so it's OK for an insert to fail
844  * to send
845  */
846  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
847  rows_to_send, PQerrorMessage(conn));
848  }
849  }
850  else if (send_step == BI_COMMIT_TX)
851  {
852  if (PQsendQueryParams(conn, "COMMIT",
853  0, NULL, NULL, NULL, NULL, 0) == 1)
854  {
855  pg_debug("sent COMMIT\n");
856  send_step++;
857  }
858  else
859  {
860  fprintf(stderr, "WARNING: failed to send commit: %s\n",
862  }
863  }
864  else if (send_step == BI_SYNC)
865  {
866  if (PQpipelineSync(conn) == 1)
867  {
868  fprintf(stdout, "pipeline sync sent\n");
869  send_step++;
870  }
871  else
872  {
873  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
875  }
876  }
877  }
878  }
879 
880  /* We've got the sync message and the pipeline should be done */
881  if (PQexitPipelineMode(conn) != 1)
882  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
884 
885  if (PQsetnonblocking(conn, 0) != 0)
886  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
887 
888  fprintf(stderr, "ok\n");
889 }
890 
891 static void
893 {
894  PGresult *res = NULL;
895  Oid param_oids[1] = {INT4OID};
896  Oid expected_oids[4];
897  Oid typ;
898 
899  fprintf(stderr, "prepared... ");
900 
901  if (PQenterPipelineMode(conn) != 1)
902  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
903  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
904  "interval '1 sec'",
905  1, param_oids) != 1)
906  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
907  expected_oids[0] = INT4OID;
908  expected_oids[1] = TEXTOID;
909  expected_oids[2] = NUMERICOID;
910  expected_oids[3] = INTERVALOID;
911  if (PQsendDescribePrepared(conn, "select_one") != 1)
912  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
913  if (PQpipelineSync(conn) != 1)
914  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
915 
916  res = PQgetResult(conn);
917  if (res == NULL)
918  pg_fatal("PQgetResult returned null");
920  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
921  PQclear(res);
922  res = PQgetResult(conn);
923  if (res != NULL)
924  pg_fatal("expected NULL result");
925 
926  res = PQgetResult(conn);
927  if (res == NULL)
928  pg_fatal("PQgetResult returned NULL");
930  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
931  if (PQnfields(res) != lengthof(expected_oids))
932  pg_fatal("expected %zd columns, got %d",
933  lengthof(expected_oids), PQnfields(res));
934  for (int i = 0; i < PQnfields(res); i++)
935  {
936  typ = PQftype(res, i);
937  if (typ != expected_oids[i])
938  pg_fatal("field %d: expected type %u, got %u",
939  i, expected_oids[i], typ);
940  }
941  PQclear(res);
942  res = PQgetResult(conn);
943  if (res != NULL)
944  pg_fatal("expected NULL result");
945 
946  res = PQgetResult(conn);
948  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
949 
950  if (PQexitPipelineMode(conn) != 1)
951  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
952 
953  PQexec(conn, "BEGIN");
954  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
956  if (PQsendDescribePortal(conn, "cursor_one") != 1)
957  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
958  if (PQpipelineSync(conn) != 1)
959  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
960  res = PQgetResult(conn);
961  if (res == NULL)
962  pg_fatal("expected NULL result");
964  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
965 
966  typ = PQftype(res, 0);
967  if (typ != INT4OID)
968  pg_fatal("portal: expected type %u, got %u",
969  INT4OID, typ);
970  PQclear(res);
971  res = PQgetResult(conn);
972  if (res != NULL)
973  pg_fatal("expected NULL result");
974  res = PQgetResult(conn);
976  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
977 
978  if (PQexitPipelineMode(conn) != 1)
979  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
980 
981  fprintf(stderr, "ok\n");
982 }
983 
984 /* Notice processor: print notices, and count how many we got */
985 static void
986 notice_processor(void *arg, const char *message)
987 {
988  int *n_notices = (int *) arg;
989 
990  (*n_notices)++;
991  fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
992 }
993 
994 /* Verify behavior in "idle" state */
995 static void
997 {
998  PGresult *res;
999  int n_notices = 0;
1000 
1001  fprintf(stderr, "\npipeline idle...\n");
1002 
1004 
1005  /* Try to exit pipeline mode in pipeline-idle state */
1006  if (PQenterPipelineMode(conn) != 1)
1007  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1008  if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1009  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1011  res = PQgetResult(conn);
1012  if (res == NULL)
1013  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1014  PQerrorMessage(conn));
1016  pg_fatal("unexpected result code %s from first pipeline item",
1018  PQclear(res);
1019  res = PQgetResult(conn);
1020  if (res != NULL)
1021  pg_fatal("did not receive terminating NULL");
1022  if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1023  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1024  if (PQexitPipelineMode(conn) == 1)
1025  pg_fatal("exiting pipeline succeeded when it shouldn't");
1026  if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1027  strlen("cannot exit pipeline mode")) != 0)
1028  pg_fatal("did not get expected error; got: %s",
1029  PQerrorMessage(conn));
1031  res = PQgetResult(conn);
1033  pg_fatal("unexpected result code %s from second pipeline item",
1035  PQclear(res);
1036  res = PQgetResult(conn);
1037  if (res != NULL)
1038  pg_fatal("did not receive terminating NULL");
1039  if (PQexitPipelineMode(conn) != 1)
1040  pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1041 
1042  if (n_notices > 0)
1043  pg_fatal("got %d notice(s)", n_notices);
1044  fprintf(stderr, "ok - 1\n");
1045 
1046  /* Have a WARNING in the middle of a resultset */
1047  if (PQenterPipelineMode(conn) != 1)
1048  pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1049  if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1050  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1052  res = PQgetResult(conn);
1053  if (res == NULL)
1054  pg_fatal("unexpected NULL result received");
1056  pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1057  if (PQexitPipelineMode(conn) != 1)
1058  pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1059  fprintf(stderr, "ok - 2\n");
1060 }
1061 
1062 static void
1064 {
1065  PGresult *res = NULL;
1066  const char *dummy_params[1] = {"1"};
1067  Oid dummy_param_oids[1] = {INT4OID};
1068 
1069  fprintf(stderr, "simple pipeline... ");
1070 
1071  /*
1072  * Enter pipeline mode and dispatch a set of operations, which we'll then
1073  * process the results of as they come in.
1074  *
1075  * For a simple case we should be able to do this without interim
1076  * processing of results since our output buffer will give us enough slush
1077  * to work with and we won't block on sending. So blocking mode is fine.
1078  */
1079  if (PQisnonblocking(conn))
1080  pg_fatal("Expected blocking connection mode");
1081 
1082  if (PQenterPipelineMode(conn) != 1)
1083  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1084 
1085  if (PQsendQueryParams(conn, "SELECT $1",
1086  1, dummy_param_oids, dummy_params,
1087  NULL, NULL, 0) != 1)
1088  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1089 
1090  if (PQexitPipelineMode(conn) != 0)
1091  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1092 
1093  if (PQpipelineSync(conn) != 1)
1094  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1095 
1096  res = PQgetResult(conn);
1097  if (res == NULL)
1098  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1099  PQerrorMessage(conn));
1100 
1102  pg_fatal("Unexpected result code %s from first pipeline item",
1104 
1105  PQclear(res);
1106  res = NULL;
1107 
1108  if (PQgetResult(conn) != NULL)
1109  pg_fatal("PQgetResult returned something extra after first query result.");
1110 
1111  /*
1112  * Even though we've processed the result there's still a sync to come and
1113  * we can't exit pipeline mode yet
1114  */
1115  if (PQexitPipelineMode(conn) != 0)
1116  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1117 
1118  res = PQgetResult(conn);
1119  if (res == NULL)
1120  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1121  PQerrorMessage(conn));
1122 
1124  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1126 
1127  PQclear(res);
1128  res = NULL;
1129 
1130  if (PQgetResult(conn) != NULL)
1131  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1133 
1134  /* We're still in pipeline mode... */
1136  pg_fatal("Fell out of pipeline mode somehow");
1137 
1138  /* ... until we end it, which we can safely do now */
1139  if (PQexitPipelineMode(conn) != 1)
1140  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1141  PQerrorMessage(conn));
1142 
1144  pg_fatal("Exiting pipeline mode didn't seem to work");
1145 
1146  fprintf(stderr, "ok\n");
1147 }
1148 
1149 static void
1151 {
1152  PGresult *res;
1153  int i;
1154  bool pipeline_ended = false;
1155 
1156  if (PQenterPipelineMode(conn) != 1)
1157  pg_fatal("failed to enter pipeline mode: %s",
1158  PQerrorMessage(conn));
1159 
1160  /* One series of three commands, using single-row mode for the first two. */
1161  for (i = 0; i < 3; i++)
1162  {
1163  char *param[1];
1164 
1165  param[0] = psprintf("%d", 44 + i);
1166 
1167  if (PQsendQueryParams(conn,
1168  "SELECT generate_series(42, $1)",
1169  1,
1170  NULL,
1171  (const char **) param,
1172  NULL,
1173  NULL,
1174  0) != 1)
1175  pg_fatal("failed to send query: %s",
1176  PQerrorMessage(conn));
1177  pfree(param[0]);
1178  }
1179  if (PQpipelineSync(conn) != 1)
1180  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1181 
1182  for (i = 0; !pipeline_ended; i++)
1183  {
1184  bool first = true;
1185  bool saw_ending_tuplesok;
1186  bool isSingleTuple = false;
1187 
1188  /* Set single row mode for only first 2 SELECT queries */
1189  if (i < 2)
1190  {
1191  if (PQsetSingleRowMode(conn) != 1)
1192  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1193  }
1194 
1195  /* Consume rows for this query */
1196  saw_ending_tuplesok = false;
1197  while ((res = PQgetResult(conn)) != NULL)
1198  {
1200 
1201  if (est == PGRES_PIPELINE_SYNC)
1202  {
1203  fprintf(stderr, "end of pipeline reached\n");
1204  pipeline_ended = true;
1205  PQclear(res);
1206  if (i != 3)
1207  pg_fatal("Expected three results, got %d", i);
1208  break;
1209  }
1210 
1211  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1212  if (first)
1213  {
1214  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1215  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1216  i, PQresStatus(est));
1217  if (i >= 2 && est != PGRES_TUPLES_OK)
1218  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1219  i, PQresStatus(est));
1220  first = false;
1221  }
1222 
1223  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1224  switch (est)
1225  {
1226  case PGRES_TUPLES_OK:
1227  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1228  saw_ending_tuplesok = true;
1229  if (isSingleTuple)
1230  {
1231  if (PQntuples(res) == 0)
1232  fprintf(stderr, "all tuples received in query %d\n", i);
1233  else
1234  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1235  }
1236  break;
1237 
1238  case PGRES_SINGLE_TUPLE:
1239  isSingleTuple = true;
1240  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1241  break;
1242 
1243  default:
1244  pg_fatal("unexpected");
1245  }
1246  PQclear(res);
1247  }
1248  if (!pipeline_ended && !saw_ending_tuplesok)
1249  pg_fatal("didn't get expected terminating TUPLES_OK");
1250  }
1251 
1252  /*
1253  * Now issue one command, get its results in with single-row mode, then
1254  * issue another command, and get its results in normal mode; make sure
1255  * the single-row mode flag is reset as expected.
1256  */
1257  if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1258  0, NULL, NULL, NULL, NULL, 0) != 1)
1259  pg_fatal("failed to send query: %s",
1260  PQerrorMessage(conn));
1261  if (PQsendFlushRequest(conn) != 1)
1262  pg_fatal("failed to send flush request");
1263  if (PQsetSingleRowMode(conn) != 1)
1264  pg_fatal("PQsetSingleRowMode() failed");
1265  res = PQgetResult(conn);
1266  if (res == NULL)
1267  pg_fatal("unexpected NULL");
1269  pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1271  res = PQgetResult(conn);
1272  if (res == NULL)
1273  pg_fatal("unexpected NULL");
1275  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1277  if (PQgetResult(conn) != NULL)
1278  pg_fatal("expected NULL result");
1279 
1280  if (PQsendQueryParams(conn, "SELECT 1",
1281  0, NULL, NULL, NULL, NULL, 0) != 1)
1282  pg_fatal("failed to send query: %s",
1283  PQerrorMessage(conn));
1284  if (PQsendFlushRequest(conn) != 1)
1285  pg_fatal("failed to send flush request");
1286  res = PQgetResult(conn);
1287  if (res == NULL)
1288  pg_fatal("unexpected NULL");
1290  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1292  if (PQgetResult(conn) != NULL)
1293  pg_fatal("expected NULL result");
1294 
1295  if (PQexitPipelineMode(conn) != 1)
1296  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1297 
1298  fprintf(stderr, "ok\n");
1299 }
1300 
1301 /*
1302  * Simple test to verify that a pipeline is discarded as a whole when there's
1303  * an error, ignoring transaction commands.
1304  */
1305 static void
1307 {
1308  PGresult *res;
1309  bool expect_null;
1310  int num_syncs = 0;
1311 
1312  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1313  "CREATE TABLE pq_pipeline_tst (id int)");
1315  pg_fatal("failed to create test table: %s",
1316  PQerrorMessage(conn));
1317  PQclear(res);
1318 
1319  if (PQenterPipelineMode(conn) != 1)
1320  pg_fatal("failed to enter pipeline mode: %s",
1321  PQerrorMessage(conn));
1322  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1323  pg_fatal("could not send prepare on pipeline: %s",
1324  PQerrorMessage(conn));
1325 
1326  if (PQsendQueryParams(conn,
1327  "BEGIN",
1328  0, NULL, NULL, NULL, NULL, 0) != 1)
1329  pg_fatal("failed to send query: %s",
1330  PQerrorMessage(conn));
1331  if (PQsendQueryParams(conn,
1332  "SELECT 0/0",
1333  0, NULL, NULL, NULL, NULL, 0) != 1)
1334  pg_fatal("failed to send query: %s",
1335  PQerrorMessage(conn));
1336 
1337  /*
1338  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1339  * get out of the pipeline-aborted state first.
1340  */
1341  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1342  pg_fatal("failed to execute prepared: %s",
1343  PQerrorMessage(conn));
1344 
1345  /* This insert fails because we're in pipeline-aborted state */
1346  if (PQsendQueryParams(conn,
1347  "INSERT INTO pq_pipeline_tst VALUES (1)",
1348  0, NULL, NULL, NULL, NULL, 0) != 1)
1349  pg_fatal("failed to send query: %s",
1350  PQerrorMessage(conn));
1351  if (PQpipelineSync(conn) != 1)
1352  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353  num_syncs++;
1354 
1355  /*
1356  * This insert fails even though the pipeline got a SYNC, because we're in
1357  * an aborted transaction
1358  */
1359  if (PQsendQueryParams(conn,
1360  "INSERT INTO pq_pipeline_tst VALUES (2)",
1361  0, NULL, NULL, NULL, NULL, 0) != 1)
1362  pg_fatal("failed to send query: %s",
1363  PQerrorMessage(conn));
1364  if (PQpipelineSync(conn) != 1)
1365  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1366  num_syncs++;
1367 
1368  /*
1369  * Send ROLLBACK using prepared stmt. This one works because we just did
1370  * PQpipelineSync above.
1371  */
1372  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1373  pg_fatal("failed to execute prepared: %s",
1374  PQerrorMessage(conn));
1375 
1376  /*
1377  * Now that we're out of a transaction and in pipeline-good mode, this
1378  * insert works
1379  */
1380  if (PQsendQueryParams(conn,
1381  "INSERT INTO pq_pipeline_tst VALUES (3)",
1382  0, NULL, NULL, NULL, NULL, 0) != 1)
1383  pg_fatal("failed to send query: %s",
1384  PQerrorMessage(conn));
1385  /* Send two syncs now -- match up to SYNC messages below */
1386  if (PQpipelineSync(conn) != 1)
1387  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1388  num_syncs++;
1389  if (PQpipelineSync(conn) != 1)
1390  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1391  num_syncs++;
1392 
1393  expect_null = false;
1394  for (int i = 0;; i++)
1395  {
1396  ExecStatusType restype;
1397 
1398  res = PQgetResult(conn);
1399  if (res == NULL)
1400  {
1401  printf("%d: got NULL result\n", i);
1402  if (!expect_null)
1403  pg_fatal("did not expect NULL here");
1404  expect_null = false;
1405  continue;
1406  }
1407  restype = PQresultStatus(res);
1408  printf("%d: got status %s", i, PQresStatus(restype));
1409  if (expect_null)
1410  pg_fatal("expected NULL");
1411  if (restype == PGRES_FATAL_ERROR)
1412  printf("; error: %s", PQerrorMessage(conn));
1413  else if (restype == PGRES_PIPELINE_ABORTED)
1414  {
1415  printf(": command didn't run because pipeline aborted\n");
1416  }
1417  else
1418  printf("\n");
1419  PQclear(res);
1420 
1421  if (restype == PGRES_PIPELINE_SYNC)
1422  num_syncs--;
1423  else
1424  expect_null = true;
1425  if (num_syncs <= 0)
1426  break;
1427  }
1428  if (PQgetResult(conn) != NULL)
1429  pg_fatal("returned something extra after all the syncs: %s",
1431 
1432  if (PQexitPipelineMode(conn) != 1)
1433  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1434 
1435  /* We expect to find one tuple containing the value "3" */
1436  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1438  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1439  if (PQntuples(res) != 1)
1440  pg_fatal("did not get 1 tuple");
1441  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1442  pg_fatal("did not get expected tuple");
1443  PQclear(res);
1444 
1445  fprintf(stderr, "ok\n");
1446 }
1447 
1448 /*
1449  * In this test mode we send a stream of queries, with one in the middle
1450  * causing an error. Verify that we can still send some more after the
1451  * error and have libpq work properly.
1452  */
1453 static void
1455 {
1456  int sock = PQsocket(conn);
1457  PGresult *res;
1458  Oid paramTypes[2] = {INT8OID, INT8OID};
1459  const char *paramValues[2];
1460  char paramValue0[MAXINT8LEN];
1461  char paramValue1[MAXINT8LEN];
1462  int ctr = 0;
1463  int numsent = 0;
1464  int results = 0;
1465  bool read_done = false;
1466  bool write_done = false;
1467  bool error_sent = false;
1468  bool got_error = false;
1469  int switched = 0;
1470  int socketful = 0;
1471  fd_set in_fds;
1472  fd_set out_fds;
1473 
1474  fprintf(stderr, "uniqviol ...");
1475 
1476  PQsetnonblocking(conn, 1);
1477 
1478  paramValues[0] = paramValue0;
1479  paramValues[1] = paramValue1;
1480  sprintf(paramValue1, "42");
1481 
1482  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1483  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1485  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1486 
1487  res = PQexec(conn, "begin");
1489  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1490 
1491  res = PQprepare(conn, "insertion",
1492  "insert into ppln_uniqviol values ($1, $2) returning id",
1493  2, paramTypes);
1494  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1495  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1496 
1497  if (PQenterPipelineMode(conn) != 1)
1498  pg_fatal("failed to enter pipeline mode");
1499 
1500  while (!read_done)
1501  {
1502  /*
1503  * Avoid deadlocks by reading everything the server has sent before
1504  * sending anything. (Special precaution is needed here to process
1505  * PQisBusy before testing the socket for read-readiness, because the
1506  * socket does not turn read-ready after "sending" queries in aborted
1507  * pipeline mode.)
1508  */
1509  while (PQisBusy(conn) == 0)
1510  {
1511  bool new_error;
1512 
1513  if (results >= numsent)
1514  {
1515  if (write_done)
1516  read_done = true;
1517  break;
1518  }
1519 
1520  res = PQgetResult(conn);
1521  new_error = process_result(conn, res, results, numsent);
1522  if (new_error && got_error)
1523  pg_fatal("got two errors");
1524  got_error |= new_error;
1525  if (results++ >= numsent - 1)
1526  {
1527  if (write_done)
1528  read_done = true;
1529  break;
1530  }
1531  }
1532 
1533  if (read_done)
1534  break;
1535 
1536  FD_ZERO(&out_fds);
1537  FD_SET(sock, &out_fds);
1538 
1539  FD_ZERO(&in_fds);
1540  FD_SET(sock, &in_fds);
1541 
1542  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1543  {
1544  if (errno == EINTR)
1545  continue;
1546  pg_fatal("select() failed: %m");
1547  }
1548 
1549  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1550  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1551 
1552  /*
1553  * If the socket is writable and we haven't finished sending queries,
1554  * send some.
1555  */
1556  if (!write_done && FD_ISSET(sock, &out_fds))
1557  {
1558  for (;;)
1559  {
1560  int flush;
1561 
1562  /*
1563  * provoke uniqueness violation exactly once after having
1564  * switched to read mode.
1565  */
1566  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1567  {
1568  sprintf(paramValue0, "%d", numsent / 2);
1569  fprintf(stderr, "E");
1570  error_sent = true;
1571  }
1572  else
1573  {
1574  fprintf(stderr, ".");
1575  sprintf(paramValue0, "%d", ctr++);
1576  }
1577 
1578  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1579  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1580  numsent++;
1581 
1582  /* Are we done writing? */
1583  if (socketful != 0 && numsent % socketful == 42 && error_sent)
1584  {
1585  if (PQsendFlushRequest(conn) != 1)
1586  pg_fatal("failed to send flush request");
1587  write_done = true;
1588  fprintf(stderr, "\ndone writing\n");
1589  PQflush(conn);
1590  break;
1591  }
1592 
1593  /* is the outgoing socket full? */
1594  flush = PQflush(conn);
1595  if (flush == -1)
1596  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1597  if (flush == 1)
1598  {
1599  if (socketful == 0)
1600  socketful = numsent;
1601  fprintf(stderr, "\nswitch to reading\n");
1602  switched++;
1603  break;
1604  }
1605  }
1606  }
1607  }
1608 
1609  if (!got_error)
1610  pg_fatal("did not get expected error");
1611 
1612  fprintf(stderr, "ok\n");
1613 }
1614 
1615 /*
1616  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1617  * the expected NULL that should follow it.
1618  *
1619  * Returns true if we read a fatal error message, otherwise false.
1620  */
1621 static bool
1622 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1623 {
1624  PGresult *res2;
1625  bool got_error = false;
1626 
1627  if (res == NULL)
1628  pg_fatal("got unexpected NULL");
1629 
1630  switch (PQresultStatus(res))
1631  {
1632  case PGRES_FATAL_ERROR:
1633  got_error = true;
1634  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1635  PQclear(res);
1636 
1637  res2 = PQgetResult(conn);
1638  if (res2 != NULL)
1639  pg_fatal("expected NULL, got %s",
1640  PQresStatus(PQresultStatus(res2)));
1641  break;
1642 
1643  case PGRES_TUPLES_OK:
1644  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1645  PQclear(res);
1646 
1647  res2 = PQgetResult(conn);
1648  if (res2 != NULL)
1649  pg_fatal("expected NULL, got %s",
1650  PQresStatus(PQresultStatus(res2)));
1651  break;
1652 
1654  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1655  res2 = PQgetResult(conn);
1656  if (res2 != NULL)
1657  pg_fatal("expected NULL, got %s",
1658  PQresStatus(PQresultStatus(res2)));
1659  break;
1660 
1661  default:
1662  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1663  }
1664 
1665  return got_error;
1666 }
1667 
1668 
1669 static void
1670 usage(const char *progname)
1671 {
1672  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1673  fprintf(stderr, "Usage:\n");
1674  fprintf(stderr, " %s [OPTION] tests\n", progname);
1675  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1676  fprintf(stderr, "\nOptions:\n");
1677  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1678  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1679 }
1680 
1681 static void
1683 {
1684  printf("disallowed_in_pipeline\n");
1685  printf("multi_pipelines\n");
1686  printf("nosync\n");
1687  printf("pipeline_abort\n");
1688  printf("pipeline_idle\n");
1689  printf("pipelined_insert\n");
1690  printf("prepared\n");
1691  printf("simple_pipeline\n");
1692  printf("singlerow\n");
1693  printf("transaction\n");
1694  printf("uniqviol\n");
1695 }
1696 
1697 int
1698 main(int argc, char **argv)
1699 {
1700  const char *conninfo = "";
1701  PGconn *conn;
1702  FILE *trace;
1703  char *testname;
1704  int numrows = 10000;
1705  PGresult *res;
1706  int c;
1707 
1708  while ((c = getopt(argc, argv, "r:t:")) != -1)
1709  {
1710  switch (c)
1711  {
1712  case 'r': /* numrows */
1713  errno = 0;
1714  numrows = strtol(optarg, NULL, 10);
1715  if (errno != 0 || numrows <= 0)
1716  {
1717  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1718  optarg);
1719  exit(1);
1720  }
1721  break;
1722  case 't': /* trace file */
1724  break;
1725  }
1726  }
1727 
1728  if (optind < argc)
1729  {
1730  testname = pg_strdup(argv[optind]);
1731  optind++;
1732  }
1733  else
1734  {
1735  usage(argv[0]);
1736  exit(1);
1737  }
1738 
1739  if (strcmp(testname, "tests") == 0)
1740  {
1741  print_test_list();
1742  exit(0);
1743  }
1744 
1745  if (optind < argc)
1746  {
1747  conninfo = pg_strdup(argv[optind]);
1748  optind++;
1749  }
1750 
1751  /* Make a connection to the database */
1752  conn = PQconnectdb(conninfo);
1753  if (PQstatus(conn) != CONNECTION_OK)
1754  {
1755  fprintf(stderr, "Connection to database failed: %s\n",
1756  PQerrorMessage(conn));
1757  exit_nicely(conn);
1758  }
1759 
1760  res = PQexec(conn, "SET lc_messages TO \"C\"");
1762  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1763  res = PQexec(conn, "SET debug_parallel_query = off");
1765  pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
1766 
1767  /* Set the trace file, if requested */
1768  if (tracefile != NULL)
1769  {
1770  if (strcmp(tracefile, "-") == 0)
1771  trace = stdout;
1772  else
1773  trace = fopen(tracefile, "w");
1774  if (trace == NULL)
1775  pg_fatal("could not open file \"%s\": %m", tracefile);
1776 
1777  /* Make it line-buffered */
1778  setvbuf(trace, NULL, PG_IOLBF, 0);
1779 
1780  PQtrace(conn, trace);
1783  }
1784 
1785  if (strcmp(testname, "disallowed_in_pipeline") == 0)
1787  else if (strcmp(testname, "multi_pipelines") == 0)
1789  else if (strcmp(testname, "nosync") == 0)
1790  test_nosync(conn);
1791  else if (strcmp(testname, "pipeline_abort") == 0)
1793  else if (strcmp(testname, "pipeline_idle") == 0)
1795  else if (strcmp(testname, "pipelined_insert") == 0)
1796  test_pipelined_insert(conn, numrows);
1797  else if (strcmp(testname, "prepared") == 0)
1799  else if (strcmp(testname, "simple_pipeline") == 0)
1801  else if (strcmp(testname, "singlerow") == 0)
1803  else if (strcmp(testname, "transaction") == 0)
1805  else if (strcmp(testname, "uniqviol") == 0)
1807  else
1808  {
1809  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1810  exit(1);
1811  }
1812 
1813  /* close the connection to the database and cleanup */
1814  PQfinish(conn);
1815  return 0;
1816 }
#define lengthof(array)
Definition: c.h:772
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7173
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7120
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4560
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:728
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7215
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7341
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7199
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1498
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1929
int PQflush(PGconn *conn)
Definition: fe-exec.c:3837
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2273
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3552
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2959
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2928
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3244
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3260
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3314
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2458
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2229
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1957
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3709
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3585
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3777
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1542
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1422
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3145
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3252
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2471
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2004
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1639
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3212
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3299
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3816
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3322
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2035
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
long val
Definition: informix.c:664
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:60
ExecStatusType
Definition: libpq-fe.h:95
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:414
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:158
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:160
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:416
static void usage(const char *progname)
static void const char * fmt
#define MAXINT8LEN
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void const char fflush(stdout)
va_end(args)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
char * tracefile
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
exit(1)
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
va_start(args, fmt)
void pfree(void *pointer)
Definition: mcxt.c:1436
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:50
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
PGDLLIMPORT char * optarg
Definition: getopt.c:52
#define PG_IOLBF
Definition: port.h:361
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
PGconn * conn
Definition: streamutil.c:54
#define EINTR
Definition: win32_port.h:376
#define select(n, r, w, e, timeout)
Definition: win32_port.h:499