PostgreSQL Source Code  git master
libpq_pipeline.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  * Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/time.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "catalog/pg_type_d.h"
24 #include "common/fe_memutils.h"
25 #include "libpq-fe.h"
26 #include "pg_getopt.h"
27 #include "portability/instr_time.h"
28 
29 
30 static void exit_nicely(PGconn *conn);
31 
32 const char *const progname = "libpq_pipeline";
33 
34 /* Options and defaults */
35 char *tracefile = NULL; /* path to PQtrace() file */
36 
37 
38 #ifdef DEBUG_OUTPUT
39 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 #else
41 #define pg_debug(...)
42 #endif
43 
44 static const char *const drop_table_sql =
45 "DROP TABLE IF EXISTS pq_pipeline_demo";
46 static const char *const create_table_sql =
47 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 "int8filler int8);";
49 static const char *const insert_sql =
50 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 static const char *const insert_sql2 =
52 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53 
54 /* max char length of an int32/64, plus sign and null terminator */
55 #define MAXINTLEN 12
56 #define MAXINT8LEN 20
57 
58 static void
60 {
61  PQfinish(conn);
62  exit(1);
63 }
64 
65 /*
66  * Print an error to stderr and terminate the program.
67  */
68 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
69 static void
71 pg_fatal_impl(int line, const char *fmt,...)
72 {
73  va_list args;
74 
75 
76  fflush(stdout);
77 
78  fprintf(stderr, "\n%s:%d: ", progname, line);
79  va_start(args, fmt);
80  vfprintf(stderr, fmt, args);
81  va_end(args);
82  Assert(fmt[strlen(fmt) - 1] != '\n');
83  fprintf(stderr, "\n");
84  exit(1);
85 }
86 
87 static void
89 {
90  PGresult *res = NULL;
91 
92  fprintf(stderr, "test error cases... ");
93 
94  if (PQisnonblocking(conn))
95  pg_fatal("Expected blocking connection mode");
96 
97  if (PQenterPipelineMode(conn) != 1)
98  pg_fatal("Unable to enter pipeline mode");
99 
100  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
101  pg_fatal("Pipeline mode not activated properly");
102 
103  /* PQexec should fail in pipeline mode */
104  res = PQexec(conn, "SELECT 1");
105  if (PQresultStatus(res) != PGRES_FATAL_ERROR)
106  pg_fatal("PQexec should fail in pipeline mode but succeeded");
107 
108  /* Entering pipeline mode when already in pipeline mode is OK */
109  if (PQenterPipelineMode(conn) != 1)
110  pg_fatal("re-entering pipeline mode should be a no-op but failed");
111 
112  if (PQisBusy(conn) != 0)
113  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
114 
115  /* ok, back to normal command mode */
116  if (PQexitPipelineMode(conn) != 1)
117  pg_fatal("couldn't exit idle empty pipeline mode");
118 
119  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
120  pg_fatal("Pipeline mode not terminated properly");
121 
122  /* exiting pipeline mode when not in pipeline mode should be a no-op */
123  if (PQexitPipelineMode(conn) != 1)
124  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
125 
126  /* can now PQexec again */
127  res = PQexec(conn, "SELECT 1");
128  if (PQresultStatus(res) != PGRES_TUPLES_OK)
129  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
130  PQerrorMessage(conn));
131 
132  fprintf(stderr, "ok\n");
133 }
134 
135 static void
137 {
138  PGresult *res = NULL;
139  const char *dummy_params[1] = {"1"};
140  Oid dummy_param_oids[1] = {INT4OID};
141 
142  fprintf(stderr, "multi pipeline... ");
143 
144  /*
145  * Queue up a couple of small pipelines and process each without returning
146  * to command mode first.
147  */
148  if (PQenterPipelineMode(conn) != 1)
149  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
150 
151  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
152  dummy_params, NULL, NULL, 0) != 1)
153  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
154 
155  if (PQpipelineSync(conn) != 1)
156  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
157 
158  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
159  dummy_params, NULL, NULL, 0) != 1)
160  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
161 
162  if (PQpipelineSync(conn) != 1)
163  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
164 
165  /* OK, start processing the results */
166  res = PQgetResult(conn);
167  if (res == NULL)
168  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
169  PQerrorMessage(conn));
170 
171  if (PQresultStatus(res) != PGRES_TUPLES_OK)
172  pg_fatal("Unexpected result code %s from first pipeline item",
174  PQclear(res);
175  res = NULL;
176 
177  if (PQgetResult(conn) != NULL)
178  pg_fatal("PQgetResult returned something extra after first result");
179 
180  if (PQexitPipelineMode(conn) != 0)
181  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
182 
183  res = PQgetResult(conn);
184  if (res == NULL)
185  pg_fatal("PQgetResult returned null when sync result expected: %s",
186  PQerrorMessage(conn));
187 
189  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
191  PQclear(res);
192 
193  /* second pipeline */
194 
195  res = PQgetResult(conn);
196  if (res == NULL)
197  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
198  PQerrorMessage(conn));
199 
200  if (PQresultStatus(res) != PGRES_TUPLES_OK)
201  pg_fatal("Unexpected result code %s from second pipeline item",
203 
204  res = PQgetResult(conn);
205  if (res != NULL)
206  pg_fatal("Expected null result, got %s",
208 
209  res = PQgetResult(conn);
210  if (res == NULL)
211  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
212  PQerrorMessage(conn));
213 
215  pg_fatal("Unexpected result code %s from second pipeline sync",
217 
218  /* We're still in pipeline mode ... */
219  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
220  pg_fatal("Fell out of pipeline mode somehow");
221 
222  /* until we end it, which we can safely do now */
223  if (PQexitPipelineMode(conn) != 1)
224  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
225  PQerrorMessage(conn));
226 
227  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
228  pg_fatal("exiting pipeline mode didn't seem to work");
229 
230  fprintf(stderr, "ok\n");
231 }
232 
233 /*
234  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
235  * still have to get results for each pipeline item, but the item will just be
236  * a PGRES_PIPELINE_ABORTED code.
237  *
238  * This intentionally doesn't use a transaction to wrap the pipeline. You should
239  * usually use an xact, but in this case we want to observe the effects of each
240  * statement.
241  */
242 static void
244 {
245  PGresult *res = NULL;
246  const char *dummy_params[1] = {"1"};
247  Oid dummy_param_oids[1] = {INT4OID};
248  int i;
249  int gotrows;
250  bool goterror;
251 
252  fprintf(stderr, "aborted pipeline... ");
253 
254  res = PQexec(conn, drop_table_sql);
255  if (PQresultStatus(res) != PGRES_COMMAND_OK)
256  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
257 
258  res = PQexec(conn, create_table_sql);
259  if (PQresultStatus(res) != PGRES_COMMAND_OK)
260  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
261 
262  /*
263  * Queue up a couple of small pipelines and process each without returning
264  * to command mode first. Make sure the second operation in the first
265  * pipeline ERRORs.
266  */
267  if (PQenterPipelineMode(conn) != 1)
268  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
269 
270  dummy_params[0] = "1";
271  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
272  dummy_params, NULL, NULL, 0) != 1)
273  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
274 
275  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
276  1, dummy_param_oids, dummy_params,
277  NULL, NULL, 0) != 1)
278  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
279 
280  dummy_params[0] = "2";
281  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
282  dummy_params, NULL, NULL, 0) != 1)
283  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
284 
285  if (PQpipelineSync(conn) != 1)
286  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
287 
288  dummy_params[0] = "3";
289  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
290  dummy_params, NULL, NULL, 0) != 1)
291  pg_fatal("dispatching second-pipeline insert failed: %s",
292  PQerrorMessage(conn));
293 
294  if (PQpipelineSync(conn) != 1)
295  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
296 
297  /*
298  * OK, start processing the pipeline results.
299  *
300  * We should get a command-ok for the first query, then a fatal error and
301  * a pipeline aborted message for the second insert, a pipeline-end, then
302  * a command-ok and a pipeline-ok for the second pipeline operation.
303  */
304  res = PQgetResult(conn);
305  if (res == NULL)
306  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
307  if (PQresultStatus(res) != PGRES_COMMAND_OK)
308  pg_fatal("Unexpected result status %s: %s",
310  PQresultErrorMessage(res));
311  PQclear(res);
312 
313  /* NULL result to signal end-of-results for this command */
314  if ((res = PQgetResult(conn)) != NULL)
315  pg_fatal("Expected null result, got %s",
317 
318  /* Second query caused error, so we expect an error next */
319  res = PQgetResult(conn);
320  if (res == NULL)
321  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
322  if (PQresultStatus(res) != PGRES_FATAL_ERROR)
323  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
325  PQclear(res);
326 
327  /* NULL result to signal end-of-results for this command */
328  if ((res = PQgetResult(conn)) != NULL)
329  pg_fatal("Expected null result, got %s",
331 
332  /*
333  * pipeline should now be aborted.
334  *
335  * Note that we could still queue more queries at this point if we wanted;
336  * they'd get added to a new third pipeline since we've already sent a
337  * second. The aborted flag relates only to the pipeline being received.
338  */
340  pg_fatal("pipeline should be flagged as aborted but isn't");
341 
342  /* third query in pipeline, the second insert */
343  res = PQgetResult(conn);
344  if (res == NULL)
345  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
347  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
349  PQclear(res);
350 
351  /* NULL result to signal end-of-results for this command */
352  if ((res = PQgetResult(conn)) != NULL)
353  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
354 
356  pg_fatal("pipeline should be flagged as aborted but isn't");
357 
358  /* Ensure we're still in pipeline */
359  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
360  pg_fatal("Fell out of pipeline mode somehow");
361 
362  /*
363  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
364  *
365  * (This is so clients know to start processing results normally again and
366  * can tell the difference between skipped commands and the sync.)
367  */
368  res = PQgetResult(conn);
369  if (res == NULL)
370  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
372  pg_fatal("Unexpected result code from first pipeline sync\n"
373  "Expected PGRES_PIPELINE_SYNC, got %s",
375  PQclear(res);
376 
378  pg_fatal("sync should've cleared the aborted flag but didn't");
379 
380  /* We're still in pipeline mode... */
381  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
382  pg_fatal("Fell out of pipeline mode somehow");
383 
384  /* the insert from the second pipeline */
385  res = PQgetResult(conn);
386  if (res == NULL)
387  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
388  if (PQresultStatus(res) != PGRES_COMMAND_OK)
389  pg_fatal("Unexpected result code %s from first item in second pipeline",
391  PQclear(res);
392 
393  /* Read the NULL result at the end of the command */
394  if ((res = PQgetResult(conn)) != NULL)
395  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
396 
397  /* the second pipeline sync */
398  if ((res = PQgetResult(conn)) == NULL)
399  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
401  pg_fatal("Unexpected result code %s from second pipeline sync",
403  PQclear(res);
404 
405  if ((res = PQgetResult(conn)) != NULL)
406  pg_fatal("Expected null result, got %s: %s",
408  PQerrorMessage(conn));
409 
410  /* Try to send two queries in one command */
411  if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
412  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
413  if (PQpipelineSync(conn) != 1)
414  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
415  goterror = false;
416  while ((res = PQgetResult(conn)) != NULL)
417  {
418  switch (PQresultStatus(res))
419  {
420  case PGRES_FATAL_ERROR:
421  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
422  pg_fatal("expected error about multiple commands, got %s",
423  PQerrorMessage(conn));
424  printf("got expected %s", PQerrorMessage(conn));
425  goterror = true;
426  break;
427  default:
428  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
429  break;
430  }
431  }
432  if (!goterror)
433  pg_fatal("did not get cannot-insert-multiple-commands error");
434  res = PQgetResult(conn);
435  if (res == NULL)
436  pg_fatal("got NULL result");
438  pg_fatal("Unexpected result code %s from pipeline sync",
440  fprintf(stderr, "ok\n");
441 
442  /* Test single-row mode with an error partways */
443  if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
444  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
445  if (PQpipelineSync(conn) != 1)
446  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
447  PQsetSingleRowMode(conn);
448  goterror = false;
449  gotrows = 0;
450  while ((res = PQgetResult(conn)) != NULL)
451  {
452  switch (PQresultStatus(res))
453  {
454  case PGRES_SINGLE_TUPLE:
455  printf("got row: %s\n", PQgetvalue(res, 0, 0));
456  gotrows++;
457  break;
458  case PGRES_FATAL_ERROR:
459  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
460  pg_fatal("expected division-by-zero, got: %s (%s)",
461  PQerrorMessage(conn),
463  printf("got expected division-by-zero\n");
464  goterror = true;
465  break;
466  default:
467  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
468  }
469  PQclear(res);
470  }
471  if (!goterror)
472  pg_fatal("did not get division-by-zero error");
473  if (gotrows != 3)
474  pg_fatal("did not get three rows");
475  /* the third pipeline sync */
476  if ((res = PQgetResult(conn)) == NULL)
477  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
479  pg_fatal("Unexpected result code %s from third pipeline sync",
481  PQclear(res);
482 
483  /* We're still in pipeline mode... */
484  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
485  pg_fatal("Fell out of pipeline mode somehow");
486 
487  /* until we end it, which we can safely do now */
488  if (PQexitPipelineMode(conn) != 1)
489  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
490  PQerrorMessage(conn));
491 
492  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
493  pg_fatal("exiting pipeline mode didn't seem to work");
494 
495  fprintf(stderr, "ok\n");
496 
497  /*-
498  * Since we fired the pipelines off without a surrounding xact, the results
499  * should be:
500  *
501  * - Implicit xact started by server around 1st pipeline
502  * - First insert applied
503  * - Second statement aborted xact
504  * - Third insert skipped
505  * - Sync rolled back first implicit xact
506  * - Implicit xact created by server around 2nd pipeline
507  * - insert applied from 2nd pipeline
508  * - Sync commits 2nd xact
509  *
510  * So we should only have the value 3 that we inserted.
511  */
512  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
513 
514  if (PQresultStatus(res) != PGRES_TUPLES_OK)
515  pg_fatal("Expected tuples, got %s: %s",
517  if (PQntuples(res) != 1)
518  pg_fatal("expected 1 result, got %d", PQntuples(res));
519  for (i = 0; i < PQntuples(res); i++)
520  {
521  const char *val = PQgetvalue(res, i, 0);
522 
523  if (strcmp(val, "3") != 0)
524  pg_fatal("expected only insert with value 3, got %s", val);
525  }
526 
527  PQclear(res);
528 }
529 
530 /* State machine enum for test_pipelined_insert */
532 {
541 };
542 
543 static void
545 {
546  Oid insert_param_oids[2] = {INT4OID, INT8OID};
547  const char *insert_params[2];
548  char insert_param_0[MAXINTLEN];
549  char insert_param_1[MAXINT8LEN];
550  enum PipelineInsertStep send_step = BI_BEGIN_TX,
551  recv_step = BI_BEGIN_TX;
552  int rows_to_send,
553  rows_to_receive;
554 
555  insert_params[0] = insert_param_0;
556  insert_params[1] = insert_param_1;
557 
558  rows_to_send = rows_to_receive = n_rows;
559 
560  /*
561  * Do a pipelined insert into a table created at the start of the pipeline
562  */
563  if (PQenterPipelineMode(conn) != 1)
564  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
565 
566  while (send_step != BI_PREPARE)
567  {
568  const char *sql;
569 
570  switch (send_step)
571  {
572  case BI_BEGIN_TX:
573  sql = "BEGIN TRANSACTION";
574  send_step = BI_DROP_TABLE;
575  break;
576 
577  case BI_DROP_TABLE:
578  sql = drop_table_sql;
579  send_step = BI_CREATE_TABLE;
580  break;
581 
582  case BI_CREATE_TABLE:
583  sql = create_table_sql;
584  send_step = BI_PREPARE;
585  break;
586 
587  default:
588  pg_fatal("invalid state");
589  sql = NULL; /* keep compiler quiet */
590  }
591 
592  pg_debug("sending: %s\n", sql);
593  if (PQsendQueryParams(conn, sql,
594  0, NULL, NULL, NULL, NULL, 0) != 1)
595  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
596  }
597 
598  Assert(send_step == BI_PREPARE);
599  pg_debug("sending: %s\n", insert_sql2);
600  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
601  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
602  send_step = BI_INSERT_ROWS;
603 
604  /*
605  * Now we start inserting. We'll be sending enough data that we could fill
606  * our output buffer, so to avoid deadlocking we need to enter nonblocking
607  * mode and consume input while we send more output. As results of each
608  * query are processed we should pop them to allow processing of the next
609  * query. There's no need to finish the pipeline before processing
610  * results.
611  */
612  if (PQsetnonblocking(conn, 1) != 0)
613  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
614 
615  while (recv_step != BI_DONE)
616  {
617  int sock;
618  fd_set input_mask;
619  fd_set output_mask;
620 
621  sock = PQsocket(conn);
622 
623  if (sock < 0)
624  break; /* shouldn't happen */
625 
626  FD_ZERO(&input_mask);
627  FD_SET(sock, &input_mask);
628  FD_ZERO(&output_mask);
629  FD_SET(sock, &output_mask);
630 
631  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
632  {
633  fprintf(stderr, "select() failed: %s\n", strerror(errno));
634  exit_nicely(conn);
635  }
636 
637  /*
638  * Process any results, so we keep the server's output buffer free
639  * flowing and it can continue to process input
640  */
641  if (FD_ISSET(sock, &input_mask))
642  {
643  PQconsumeInput(conn);
644 
645  /* Read until we'd block if we tried to read */
646  while (!PQisBusy(conn) && recv_step < BI_DONE)
647  {
648  PGresult *res;
649  const char *cmdtag = "";
650  const char *description = "";
651  int status;
652 
653  /*
654  * Read next result. If no more results from this query,
655  * advance to the next query
656  */
657  res = PQgetResult(conn);
658  if (res == NULL)
659  continue;
660 
661  status = PGRES_COMMAND_OK;
662  switch (recv_step)
663  {
664  case BI_BEGIN_TX:
665  cmdtag = "BEGIN";
666  recv_step++;
667  break;
668  case BI_DROP_TABLE:
669  cmdtag = "DROP TABLE";
670  recv_step++;
671  break;
672  case BI_CREATE_TABLE:
673  cmdtag = "CREATE TABLE";
674  recv_step++;
675  break;
676  case BI_PREPARE:
677  cmdtag = "";
678  description = "PREPARE";
679  recv_step++;
680  break;
681  case BI_INSERT_ROWS:
682  cmdtag = "INSERT";
683  rows_to_receive--;
684  if (rows_to_receive == 0)
685  recv_step++;
686  break;
687  case BI_COMMIT_TX:
688  cmdtag = "COMMIT";
689  recv_step++;
690  break;
691  case BI_SYNC:
692  cmdtag = "";
693  description = "SYNC";
694  status = PGRES_PIPELINE_SYNC;
695  recv_step++;
696  break;
697  case BI_DONE:
698  /* unreachable */
699  pg_fatal("unreachable state");
700  }
701 
702  if (PQresultStatus(res) != status)
703  pg_fatal("%s reported status %s, expected %s\n"
704  "Error message: \"%s\"",
705  description, PQresStatus(PQresultStatus(res)),
706  PQresStatus(status), PQerrorMessage(conn));
707 
708  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
709  pg_fatal("%s expected command tag '%s', got '%s'",
710  description, cmdtag, PQcmdStatus(res));
711 
712  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
713 
714  PQclear(res);
715  }
716  }
717 
718  /* Write more rows and/or the end pipeline message, if needed */
719  if (FD_ISSET(sock, &output_mask))
720  {
721  PQflush(conn);
722 
723  if (send_step == BI_INSERT_ROWS)
724  {
725  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
726  /* use up some buffer space with a wide value */
727  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
728 
729  if (PQsendQueryPrepared(conn, "my_insert",
730  2, insert_params, NULL, NULL, 0) == 1)
731  {
732  pg_debug("sent row %d\n", rows_to_send);
733 
734  rows_to_send--;
735  if (rows_to_send == 0)
736  send_step++;
737  }
738  else
739  {
740  /*
741  * in nonblocking mode, so it's OK for an insert to fail
742  * to send
743  */
744  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
745  rows_to_send, PQerrorMessage(conn));
746  }
747  }
748  else if (send_step == BI_COMMIT_TX)
749  {
750  if (PQsendQueryParams(conn, "COMMIT",
751  0, NULL, NULL, NULL, NULL, 0) == 1)
752  {
753  pg_debug("sent COMMIT\n");
754  send_step++;
755  }
756  else
757  {
758  fprintf(stderr, "WARNING: failed to send commit: %s\n",
759  PQerrorMessage(conn));
760  }
761  }
762  else if (send_step == BI_SYNC)
763  {
764  if (PQpipelineSync(conn) == 1)
765  {
766  fprintf(stdout, "pipeline sync sent\n");
767  send_step++;
768  }
769  else
770  {
771  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
772  PQerrorMessage(conn));
773  }
774  }
775  }
776  }
777 
778  /* We've got the sync message and the pipeline should be done */
779  if (PQexitPipelineMode(conn) != 1)
780  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
781  PQerrorMessage(conn));
782 
783  if (PQsetnonblocking(conn, 0) != 0)
784  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
785 
786  fprintf(stderr, "ok\n");
787 }
788 
789 static void
791 {
792  PGresult *res = NULL;
793  Oid param_oids[1] = {INT4OID};
794  Oid expected_oids[4];
795  Oid typ;
796 
797  fprintf(stderr, "prepared... ");
798 
799  if (PQenterPipelineMode(conn) != 1)
800  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
801  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
802  "interval '1 sec'",
803  1, param_oids) != 1)
804  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
805  expected_oids[0] = INT4OID;
806  expected_oids[1] = TEXTOID;
807  expected_oids[2] = NUMERICOID;
808  expected_oids[3] = INTERVALOID;
809  if (PQsendDescribePrepared(conn, "select_one") != 1)
810  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
811  if (PQpipelineSync(conn) != 1)
812  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
813 
814  res = PQgetResult(conn);
815  if (res == NULL)
816  pg_fatal("PQgetResult returned null");
817  if (PQresultStatus(res) != PGRES_COMMAND_OK)
818  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
819  PQclear(res);
820  res = PQgetResult(conn);
821  if (res != NULL)
822  pg_fatal("expected NULL result");
823 
824  res = PQgetResult(conn);
825  if (res == NULL)
826  pg_fatal("PQgetResult returned NULL");
827  if (PQresultStatus(res) != PGRES_COMMAND_OK)
828  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
829  if (PQnfields(res) != lengthof(expected_oids))
830  pg_fatal("expected %d columns, got %d",
831  lengthof(expected_oids), PQnfields(res));
832  for (int i = 0; i < PQnfields(res); i++)
833  {
834  typ = PQftype(res, i);
835  if (typ != expected_oids[i])
836  pg_fatal("field %d: expected type %u, got %u",
837  i, expected_oids[i], typ);
838  }
839  PQclear(res);
840  res = PQgetResult(conn);
841  if (res != NULL)
842  pg_fatal("expected NULL result");
843 
844  res = PQgetResult(conn);
846  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
847 
848  if (PQexitPipelineMode(conn) != 1)
849  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
850 
851  PQexec(conn, "BEGIN");
852  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
853  PQenterPipelineMode(conn);
854  if (PQsendDescribePortal(conn, "cursor_one") != 1)
855  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
856  if (PQpipelineSync(conn) != 1)
857  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
858  res = PQgetResult(conn);
859  if (res == NULL)
860  pg_fatal("expected NULL result");
861  if (PQresultStatus(res) != PGRES_COMMAND_OK)
862  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
863 
864  typ = PQftype(res, 0);
865  if (typ != INT4OID)
866  pg_fatal("portal: expected type %u, got %u",
867  INT4OID, typ);
868  PQclear(res);
869  res = PQgetResult(conn);
870  if (res != NULL)
871  pg_fatal("expected NULL result");
872  res = PQgetResult(conn);
874  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
875 
876  if (PQexitPipelineMode(conn) != 1)
877  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
878 
879  fprintf(stderr, "ok\n");
880 }
881 
882 static void
884 {
885  PGresult *res = NULL;
886  const char *dummy_params[1] = {"1"};
887  Oid dummy_param_oids[1] = {INT4OID};
888 
889  fprintf(stderr, "simple pipeline... ");
890 
891  /*
892  * Enter pipeline mode and dispatch a set of operations, which we'll then
893  * process the results of as they come in.
894  *
895  * For a simple case we should be able to do this without interim
896  * processing of results since our output buffer will give us enough slush
897  * to work with and we won't block on sending. So blocking mode is fine.
898  */
899  if (PQisnonblocking(conn))
900  pg_fatal("Expected blocking connection mode");
901 
902  if (PQenterPipelineMode(conn) != 1)
903  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
904 
905  if (PQsendQueryParams(conn, "SELECT $1",
906  1, dummy_param_oids, dummy_params,
907  NULL, NULL, 0) != 1)
908  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
909 
910  if (PQexitPipelineMode(conn) != 0)
911  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
912 
913  if (PQpipelineSync(conn) != 1)
914  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
915 
916  res = PQgetResult(conn);
917  if (res == NULL)
918  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
919  PQerrorMessage(conn));
920 
921  if (PQresultStatus(res) != PGRES_TUPLES_OK)
922  pg_fatal("Unexpected result code %s from first pipeline item",
924 
925  PQclear(res);
926  res = NULL;
927 
928  if (PQgetResult(conn) != NULL)
929  pg_fatal("PQgetResult returned something extra after first query result.");
930 
931  /*
932  * Even though we've processed the result there's still a sync to come and
933  * we can't exit pipeline mode yet
934  */
935  if (PQexitPipelineMode(conn) != 0)
936  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
937 
938  res = PQgetResult(conn);
939  if (res == NULL)
940  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
941  PQerrorMessage(conn));
942 
944  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
946 
947  PQclear(res);
948  res = NULL;
949 
950  if (PQgetResult(conn) != NULL)
951  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
953 
954  /* We're still in pipeline mode... */
955  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
956  pg_fatal("Fell out of pipeline mode somehow");
957 
958  /* ... until we end it, which we can safely do now */
959  if (PQexitPipelineMode(conn) != 1)
960  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
961  PQerrorMessage(conn));
962 
963  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
964  pg_fatal("Exiting pipeline mode didn't seem to work");
965 
966  fprintf(stderr, "ok\n");
967 }
968 
969 static void
971 {
972  PGresult *res;
973  int i;
974  bool pipeline_ended = false;
975 
976  /* 1 pipeline, 3 queries in it */
977  if (PQenterPipelineMode(conn) != 1)
978  pg_fatal("failed to enter pipeline mode: %s",
979  PQerrorMessage(conn));
980 
981  for (i = 0; i < 3; i++)
982  {
983  char *param[1];
984 
985  param[0] = psprintf("%d", 44 + i);
986 
987  if (PQsendQueryParams(conn,
988  "SELECT generate_series(42, $1)",
989  1,
990  NULL,
991  (const char **) param,
992  NULL,
993  NULL,
994  0) != 1)
995  pg_fatal("failed to send query: %s",
996  PQerrorMessage(conn));
997  pfree(param[0]);
998  }
999  if (PQpipelineSync(conn) != 1)
1000  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1001 
1002  for (i = 0; !pipeline_ended; i++)
1003  {
1004  bool first = true;
1005  bool saw_ending_tuplesok;
1006  bool isSingleTuple = false;
1007 
1008  /* Set single row mode for only first 2 SELECT queries */
1009  if (i < 2)
1010  {
1011  if (PQsetSingleRowMode(conn) != 1)
1012  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1013  }
1014 
1015  /* Consume rows for this query */
1016  saw_ending_tuplesok = false;
1017  while ((res = PQgetResult(conn)) != NULL)
1018  {
1019  ExecStatusType est = PQresultStatus(res);
1020 
1021  if (est == PGRES_PIPELINE_SYNC)
1022  {
1023  fprintf(stderr, "end of pipeline reached\n");
1024  pipeline_ended = true;
1025  PQclear(res);
1026  if (i != 3)
1027  pg_fatal("Expected three results, got %d", i);
1028  break;
1029  }
1030 
1031  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1032  if (first)
1033  {
1034  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1035  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1036  i, PQresStatus(est));
1037  if (i >= 2 && est != PGRES_TUPLES_OK)
1038  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1039  i, PQresStatus(est));
1040  first = false;
1041  }
1042 
1043  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1044  switch (est)
1045  {
1046  case PGRES_TUPLES_OK:
1047  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1048  saw_ending_tuplesok = true;
1049  if (isSingleTuple)
1050  {
1051  if (PQntuples(res) == 0)
1052  fprintf(stderr, "all tuples received in query %d\n", i);
1053  else
1054  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1055  }
1056  break;
1057 
1058  case PGRES_SINGLE_TUPLE:
1059  isSingleTuple = true;
1060  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1061  break;
1062 
1063  default:
1064  pg_fatal("unexpected");
1065  }
1066  PQclear(res);
1067  }
1068  if (!pipeline_ended && !saw_ending_tuplesok)
1069  pg_fatal("didn't get expected terminating TUPLES_OK");
1070  }
1071 
1072  if (PQexitPipelineMode(conn) != 1)
1073  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1074 }
1075 
1076 /*
1077  * Simple test to verify that a pipeline is discarded as a whole when there's
1078  * an error, ignoring transaction commands.
1079  */
1080 static void
1082 {
1083  PGresult *res;
1084  bool expect_null;
1085  int num_syncs = 0;
1086 
1087  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1088  "CREATE TABLE pq_pipeline_tst (id int)");
1089  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1090  pg_fatal("failed to create test table: %s",
1091  PQerrorMessage(conn));
1092  PQclear(res);
1093 
1094  if (PQenterPipelineMode(conn) != 1)
1095  pg_fatal("failed to enter pipeline mode: %s",
1096  PQerrorMessage(conn));
1097  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1098  pg_fatal("could not send prepare on pipeline: %s",
1099  PQerrorMessage(conn));
1100 
1101  if (PQsendQueryParams(conn,
1102  "BEGIN",
1103  0, NULL, NULL, NULL, NULL, 0) != 1)
1104  pg_fatal("failed to send query: %s",
1105  PQerrorMessage(conn));
1106  if (PQsendQueryParams(conn,
1107  "SELECT 0/0",
1108  0, NULL, NULL, NULL, NULL, 0) != 1)
1109  pg_fatal("failed to send query: %s",
1110  PQerrorMessage(conn));
1111 
1112  /*
1113  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1114  * get out of the pipeline-aborted state first.
1115  */
1116  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1117  pg_fatal("failed to execute prepared: %s",
1118  PQerrorMessage(conn));
1119 
1120  /* This insert fails because we're in pipeline-aborted state */
1121  if (PQsendQueryParams(conn,
1122  "INSERT INTO pq_pipeline_tst VALUES (1)",
1123  0, NULL, NULL, NULL, NULL, 0) != 1)
1124  pg_fatal("failed to send query: %s",
1125  PQerrorMessage(conn));
1126  if (PQpipelineSync(conn) != 1)
1127  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1128  num_syncs++;
1129 
1130  /*
1131  * This insert fails even though the pipeline got a SYNC, because we're in
1132  * an aborted transaction
1133  */
1134  if (PQsendQueryParams(conn,
1135  "INSERT INTO pq_pipeline_tst VALUES (2)",
1136  0, NULL, NULL, NULL, NULL, 0) != 1)
1137  pg_fatal("failed to send query: %s",
1138  PQerrorMessage(conn));
1139  if (PQpipelineSync(conn) != 1)
1140  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1141  num_syncs++;
1142 
1143  /*
1144  * Send ROLLBACK using prepared stmt. This one works because we just did
1145  * PQpipelineSync above.
1146  */
1147  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1148  pg_fatal("failed to execute prepared: %s",
1149  PQerrorMessage(conn));
1150 
1151  /*
1152  * Now that we're out of a transaction and in pipeline-good mode, this
1153  * insert works
1154  */
1155  if (PQsendQueryParams(conn,
1156  "INSERT INTO pq_pipeline_tst VALUES (3)",
1157  0, NULL, NULL, NULL, NULL, 0) != 1)
1158  pg_fatal("failed to send query: %s",
1159  PQerrorMessage(conn));
1160  /* Send two syncs now -- match up to SYNC messages below */
1161  if (PQpipelineSync(conn) != 1)
1162  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1163  num_syncs++;
1164  if (PQpipelineSync(conn) != 1)
1165  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1166  num_syncs++;
1167 
1168  expect_null = false;
1169  for (int i = 0;; i++)
1170  {
1171  ExecStatusType restype;
1172 
1173  res = PQgetResult(conn);
1174  if (res == NULL)
1175  {
1176  printf("%d: got NULL result\n", i);
1177  if (!expect_null)
1178  pg_fatal("did not expect NULL here");
1179  expect_null = false;
1180  continue;
1181  }
1182  restype = PQresultStatus(res);
1183  printf("%d: got status %s", i, PQresStatus(restype));
1184  if (expect_null)
1185  pg_fatal("expected NULL");
1186  if (restype == PGRES_FATAL_ERROR)
1187  printf("; error: %s", PQerrorMessage(conn));
1188  else if (restype == PGRES_PIPELINE_ABORTED)
1189  {
1190  printf(": command didn't run because pipeline aborted\n");
1191  }
1192  else
1193  printf("\n");
1194  PQclear(res);
1195 
1196  if (restype == PGRES_PIPELINE_SYNC)
1197  num_syncs--;
1198  else
1199  expect_null = true;
1200  if (num_syncs <= 0)
1201  break;
1202  }
1203  if (PQgetResult(conn) != NULL)
1204  pg_fatal("returned something extra after all the syncs: %s",
1205  PQresStatus(PQresultStatus(res)));
1206 
1207  if (PQexitPipelineMode(conn) != 1)
1208  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1209 
1210  /* We expect to find one tuple containing the value "3" */
1211  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1212  if (PQresultStatus(res) != PGRES_TUPLES_OK)
1213  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1214  if (PQntuples(res) != 1)
1215  pg_fatal("did not get 1 tuple");
1216  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1217  pg_fatal("did not get expected tuple");
1218  PQclear(res);
1219 
1220  fprintf(stderr, "ok\n");
1221 }
1222 
1223 static void
1224 usage(const char *progname)
1225 {
1226  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1227  fprintf(stderr, "Usage:\n");
1228  fprintf(stderr, " %s [OPTION] tests\n", progname);
1229  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1230  fprintf(stderr, "\nOptions:\n");
1231  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1232  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1233 }
1234 
1235 static void
1237 {
1238  printf("disallowed_in_pipeline\n");
1239  printf("multi_pipelines\n");
1240  printf("pipeline_abort\n");
1241  printf("pipelined_insert\n");
1242  printf("prepared\n");
1243  printf("simple_pipeline\n");
1244  printf("singlerow\n");
1245  printf("transaction\n");
1246 }
1247 
1248 int
1249 main(int argc, char **argv)
1250 {
1251  const char *conninfo = "";
1252  PGconn *conn;
1253  FILE *trace;
1254  char *testname;
1255  int numrows = 10000;
1256  PGresult *res;
1257  int c;
1258 
1259  while ((c = getopt(argc, argv, "t:r:")) != -1)
1260  {
1261  switch (c)
1262  {
1263  case 't': /* trace file */
1265  break;
1266  case 'r': /* numrows */
1267  errno = 0;
1268  numrows = strtol(optarg, NULL, 10);
1269  if (errno != 0 || numrows <= 0)
1270  {
1271  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1272  optarg);
1273  exit(1);
1274  }
1275  break;
1276  }
1277  }
1278 
1279  if (optind < argc)
1280  {
1281  testname = pg_strdup(argv[optind]);
1282  optind++;
1283  }
1284  else
1285  {
1286  usage(argv[0]);
1287  exit(1);
1288  }
1289 
1290  if (strcmp(testname, "tests") == 0)
1291  {
1292  print_test_list();
1293  exit(0);
1294  }
1295 
1296  if (optind < argc)
1297  {
1298  conninfo = pg_strdup(argv[optind]);
1299  optind++;
1300  }
1301 
1302  /* Make a connection to the database */
1303  conn = PQconnectdb(conninfo);
1304  if (PQstatus(conn) != CONNECTION_OK)
1305  {
1306  fprintf(stderr, "Connection to database failed: %s\n",
1307  PQerrorMessage(conn));
1308  exit_nicely(conn);
1309  }
1310 
1311  res = PQexec(conn, "SET lc_messages TO \"C\"");
1312  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1313  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1314  res = PQexec(conn, "SET force_parallel_mode = off");
1315  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1316  pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
1317 
1318  /* Set the trace file, if requested */
1319  if (tracefile != NULL)
1320  {
1321  trace = fopen(tracefile, "w");
1322  if (trace == NULL)
1323  pg_fatal("could not open file \"%s\": %m", tracefile);
1324 
1325  /* Make it line-buffered */
1326  setvbuf(trace, NULL, PG_IOLBF, 0);
1327 
1328  PQtrace(conn, trace);
1329  PQtraceSetFlags(conn,
1331  }
1332 
1333  if (strcmp(testname, "disallowed_in_pipeline") == 0)
1335  else if (strcmp(testname, "multi_pipelines") == 0)
1336  test_multi_pipelines(conn);
1337  else if (strcmp(testname, "pipeline_abort") == 0)
1338  test_pipeline_abort(conn);
1339  else if (strcmp(testname, "pipelined_insert") == 0)
1340  test_pipelined_insert(conn, numrows);
1341  else if (strcmp(testname, "prepared") == 0)
1342  test_prepared(conn);
1343  else if (strcmp(testname, "simple_pipeline") == 0)
1344  test_simple_pipeline(conn);
1345  else if (strcmp(testname, "singlerow") == 0)
1346  test_singlerowmode(conn);
1347  else if (strcmp(testname, "transaction") == 0)
1348  test_transaction(conn);
1349  else
1350  {
1351  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1352  exit(1);
1353  }
1354 
1355  /* close the connection to the database and cleanup */
1356  PQfinish(conn);
1357  return 0;
1358 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3175
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6735
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3666
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2876
static const char *const insert_sql2
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1387
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3561
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2844
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2384
PipelineInsertStep
const char *const progname
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3105
static void test_singlerowmode(PGconn *conn)
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4229
#define printf(...)
Definition: port.h:222
ExecStatusType
Definition: libpq-fe.h:83
#define pg_debug(...)
#define lengthof(array)
Definition: c.h:734
static void test_prepared(PGconn *conn)
unsigned int Oid
Definition: postgres_ext.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3167
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define fprintf
Definition: port.h:220
static void test_multi_pipelines(PGconn *conn)
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
static void test_pipeline_abort(PGconn *conn)
static void test_disallowed_in_pipeline(PGconn *conn)
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1279
static const char *const create_table_sql
void pfree(void *pointer)
Definition: mcxt.c:1169
int optind
Definition: getopt.c:50
PGconn * conn
Definition: streamutil.c:54
int PQflush(PGconn *conn)
Definition: fe-exec.c:3685
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3405
static void test_transaction(PGconn *conn)
char * c
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1825
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2371
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
static const char *const insert_sql
char * tracefile
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3027
#define MAXINTLEN
#define MAXINT8LEN
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3438
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:405
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1853
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3629
static void print_test_list(void)
#define PG_IOLBF
Definition: port.h:344
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQclear(PGresult *res)
Definition: fe-exec.c:680
static void test_simple_pipeline(PGconn *conn)
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3152
#define Assert(condition)
Definition: c.h:804
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1900
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6769
#define strerror
Definition: port.h:229
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1533
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:403
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3113
static const char *const drop_table_sql
char * optarg
Definition: getopt.c:52
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2142
static void test_pipelined_insert(PGconn *conn, int n_rows)
int main(int argc, char **argv)
static void pg_attribute_noreturn()
#define vfprintf
Definition: port.h:219
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6682
#define snprintf
Definition: port.h:216
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6753
long val
Definition: informix.c:664
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
void PQtraceSetFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
static void exit_nicely(PGconn *conn)
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1432
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:718
static void usage(const char *progname)