PostgreSQL Source Code  git master
libpq_pipeline.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_pipeline.c
4  * Verify libpq pipeline execution functionality
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/test/modules/libpq_pipeline/libpq_pipeline.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <sys/select.h>
19 #include <sys/time.h>
20 
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
23 #include "libpq-fe.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
26 
27 
28 static void exit_nicely(PGconn *conn);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32  int numsent);
33 
34 const char *const progname = "libpq_pipeline";
35 
36 /* Options and defaults */
37 char *tracefile = NULL; /* path to PQtrace() file */
38 
39 
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
45 
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
59 
60 static void
62 {
63  PQfinish(conn);
64  exit(1);
65 }
66 
67 /*
68  * Print an error to stderr and terminate the program.
69  */
70 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 static void
73 pg_fatal_impl(int line, const char *fmt,...)
74 {
75  va_list args;
76 
77 
79 
80  fprintf(stderr, "\n%s:%d: ", progname, line);
82  vfprintf(stderr, fmt, args);
84  Assert(fmt[strlen(fmt) - 1] != '\n');
85  fprintf(stderr, "\n");
86  exit(1);
87 }
88 
89 static void
91 {
92  PGresult *res = NULL;
93 
94  fprintf(stderr, "test error cases... ");
95 
96  if (PQisnonblocking(conn))
97  pg_fatal("Expected blocking connection mode");
98 
99  if (PQenterPipelineMode(conn) != 1)
100  pg_fatal("Unable to enter pipeline mode");
101 
103  pg_fatal("Pipeline mode not activated properly");
104 
105  /* PQexec should fail in pipeline mode */
106  res = PQexec(conn, "SELECT 1");
108  pg_fatal("PQexec should fail in pipeline mode but succeeded");
109  if (strcmp(PQerrorMessage(conn),
110  "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111  pg_fatal("did not get expected error message; got: \"%s\"",
113 
114  /* PQsendQuery should fail in pipeline mode */
115  if (PQsendQuery(conn, "SELECT 1") != 0)
116  pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
117  if (strcmp(PQerrorMessage(conn),
118  "PQsendQuery not allowed in pipeline mode\n") != 0)
119  pg_fatal("did not get expected error message; got: \"%s\"",
121 
122  /* Entering pipeline mode when already in pipeline mode is OK */
123  if (PQenterPipelineMode(conn) != 1)
124  pg_fatal("re-entering pipeline mode should be a no-op but failed");
125 
126  if (PQisBusy(conn) != 0)
127  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
128 
129  /* ok, back to normal command mode */
130  if (PQexitPipelineMode(conn) != 1)
131  pg_fatal("couldn't exit idle empty pipeline mode");
132 
134  pg_fatal("Pipeline mode not terminated properly");
135 
136  /* exiting pipeline mode when not in pipeline mode should be a no-op */
137  if (PQexitPipelineMode(conn) != 1)
138  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
139 
140  /* can now PQexec again */
141  res = PQexec(conn, "SELECT 1");
143  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
145 
146  fprintf(stderr, "ok\n");
147 }
148 
149 static void
151 {
152  PGresult *res = NULL;
153  const char *dummy_params[1] = {"1"};
154  Oid dummy_param_oids[1] = {INT4OID};
155 
156  fprintf(stderr, "multi pipeline... ");
157 
158  /*
159  * Queue up a couple of small pipelines and process each without returning
160  * to command mode first.
161  */
162  if (PQenterPipelineMode(conn) != 1)
163  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164 
165  /* first pipeline */
166  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
167  dummy_params, NULL, NULL, 0) != 1)
168  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
169 
170  if (PQpipelineSync(conn) != 1)
171  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
172 
173  /* second pipeline */
174  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
175  dummy_params, NULL, NULL, 0) != 1)
176  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
177 
178  /* Skip flushing once. */
179  if (PQsendPipelineSync(conn) != 1)
180  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
181 
182  /* third pipeline */
183  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
184  dummy_params, NULL, NULL, 0) != 1)
185  pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
186 
187  if (PQpipelineSync(conn) != 1)
188  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
189 
190  /* OK, start processing the results */
191 
192  /* first pipeline */
193 
194  res = PQgetResult(conn);
195  if (res == NULL)
196  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
198 
200  pg_fatal("Unexpected result code %s from first pipeline item",
202  PQclear(res);
203  res = NULL;
204 
205  if (PQgetResult(conn) != NULL)
206  pg_fatal("PQgetResult returned something extra after first result");
207 
208  if (PQexitPipelineMode(conn) != 0)
209  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
210 
211  res = PQgetResult(conn);
212  if (res == NULL)
213  pg_fatal("PQgetResult returned null when sync result expected: %s",
215 
217  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
219  PQclear(res);
220 
221  /* second pipeline */
222 
223  res = PQgetResult(conn);
224  if (res == NULL)
225  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
227 
229  pg_fatal("Unexpected result code %s from second pipeline item",
231  PQclear(res);
232  res = NULL;
233 
234  if (PQgetResult(conn) != NULL)
235  pg_fatal("PQgetResult returned something extra after first result");
236 
237  if (PQexitPipelineMode(conn) != 0)
238  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
239 
240  res = PQgetResult(conn);
241  if (res == NULL)
242  pg_fatal("PQgetResult returned null when sync result expected: %s",
244 
246  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
248  PQclear(res);
249 
250  /* third pipeline */
251 
252  res = PQgetResult(conn);
253  if (res == NULL)
254  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
256 
258  pg_fatal("Unexpected result code %s from third pipeline item",
260 
261  res = PQgetResult(conn);
262  if (res != NULL)
263  pg_fatal("Expected null result, got %s",
265 
266  res = PQgetResult(conn);
267  if (res == NULL)
268  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
270 
272  pg_fatal("Unexpected result code %s from second pipeline sync",
274 
275  /* We're still in pipeline mode ... */
277  pg_fatal("Fell out of pipeline mode somehow");
278 
279  /* until we end it, which we can safely do now */
280  if (PQexitPipelineMode(conn) != 1)
281  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
283 
285  pg_fatal("exiting pipeline mode didn't seem to work");
286 
287  fprintf(stderr, "ok\n");
288 }
289 
290 /*
291  * Test behavior when a pipeline dispatches a number of commands that are
292  * not flushed by a sync point.
293  */
294 static void
296 {
297  int numqueries = 10;
298  int results = 0;
299  int sock = PQsocket(conn);
300 
301  fprintf(stderr, "nosync... ");
302 
303  if (sock < 0)
304  pg_fatal("invalid socket");
305 
306  if (PQenterPipelineMode(conn) != 1)
307  pg_fatal("could not enter pipeline mode");
308  for (int i = 0; i < numqueries; i++)
309  {
310  fd_set input_mask;
311  struct timeval tv;
312 
313  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
314  0, NULL, NULL, NULL, NULL, 0) != 1)
315  pg_fatal("error sending select: %s", PQerrorMessage(conn));
316  PQflush(conn);
317 
318  /*
319  * If the server has written anything to us, read (some of) it now.
320  */
321  FD_ZERO(&input_mask);
322  FD_SET(sock, &input_mask);
323  tv.tv_sec = 0;
324  tv.tv_usec = 0;
325  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
326  {
327  fprintf(stderr, "select() failed: %s\n", strerror(errno));
328  exit_nicely(conn);
329  }
330  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
331  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
332  }
333 
334  /* tell server to flush its output buffer */
335  if (PQsendFlushRequest(conn) != 1)
336  pg_fatal("failed to send flush request");
337  PQflush(conn);
338 
339  /* Now read all results */
340  for (;;)
341  {
342  PGresult *res;
343 
344  res = PQgetResult(conn);
345 
346  /* NULL results are only expected after TUPLES_OK */
347  if (res == NULL)
348  pg_fatal("got unexpected NULL result after %d results", results);
349 
350  /* We expect exactly one TUPLES_OK result for each query we sent */
352  {
353  PGresult *res2;
354 
355  /* and one NULL result should follow each */
356  res2 = PQgetResult(conn);
357  if (res2 != NULL)
358  pg_fatal("expected NULL, got %s",
359  PQresStatus(PQresultStatus(res2)));
360  PQclear(res);
361  results++;
362 
363  /* if we're done, we're done */
364  if (results == numqueries)
365  break;
366 
367  continue;
368  }
369 
370  /* anything else is unexpected */
371  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
372  }
373 
374  fprintf(stderr, "ok\n");
375 }
376 
377 /*
378  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
379  * still have to get results for each pipeline item, but the item will just be
380  * a PGRES_PIPELINE_ABORTED code.
381  *
382  * This intentionally doesn't use a transaction to wrap the pipeline. You should
383  * usually use an xact, but in this case we want to observe the effects of each
384  * statement.
385  */
386 static void
388 {
389  PGresult *res = NULL;
390  const char *dummy_params[1] = {"1"};
391  Oid dummy_param_oids[1] = {INT4OID};
392  int i;
393  int gotrows;
394  bool goterror;
395 
396  fprintf(stderr, "aborted pipeline... ");
397 
400  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
401 
404  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
405 
406  /*
407  * Queue up a couple of small pipelines and process each without returning
408  * to command mode first. Make sure the second operation in the first
409  * pipeline ERRORs.
410  */
411  if (PQenterPipelineMode(conn) != 1)
412  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
413 
414  dummy_params[0] = "1";
415  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
416  dummy_params, NULL, NULL, 0) != 1)
417  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
418 
419  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
420  1, dummy_param_oids, dummy_params,
421  NULL, NULL, 0) != 1)
422  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
423 
424  dummy_params[0] = "2";
425  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
426  dummy_params, NULL, NULL, 0) != 1)
427  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
428 
429  if (PQpipelineSync(conn) != 1)
430  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
431 
432  dummy_params[0] = "3";
433  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
434  dummy_params, NULL, NULL, 0) != 1)
435  pg_fatal("dispatching second-pipeline insert failed: %s",
437 
438  if (PQpipelineSync(conn) != 1)
439  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
440 
441  /*
442  * OK, start processing the pipeline results.
443  *
444  * We should get a command-ok for the first query, then a fatal error and
445  * a pipeline aborted message for the second insert, a pipeline-end, then
446  * a command-ok and a pipeline-ok for the second pipeline operation.
447  */
448  res = PQgetResult(conn);
449  if (res == NULL)
450  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
452  pg_fatal("Unexpected result status %s: %s",
455  PQclear(res);
456 
457  /* NULL result to signal end-of-results for this command */
458  if ((res = PQgetResult(conn)) != NULL)
459  pg_fatal("Expected null result, got %s",
461 
462  /* Second query caused error, so we expect an error next */
463  res = PQgetResult(conn);
464  if (res == NULL)
465  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
467  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
469  PQclear(res);
470 
471  /* NULL result to signal end-of-results for this command */
472  if ((res = PQgetResult(conn)) != NULL)
473  pg_fatal("Expected null result, got %s",
475 
476  /*
477  * pipeline should now be aborted.
478  *
479  * Note that we could still queue more queries at this point if we wanted;
480  * they'd get added to a new third pipeline since we've already sent a
481  * second. The aborted flag relates only to the pipeline being received.
482  */
484  pg_fatal("pipeline should be flagged as aborted but isn't");
485 
486  /* third query in pipeline, the second insert */
487  res = PQgetResult(conn);
488  if (res == NULL)
489  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
491  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
493  PQclear(res);
494 
495  /* NULL result to signal end-of-results for this command */
496  if ((res = PQgetResult(conn)) != NULL)
497  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
498 
500  pg_fatal("pipeline should be flagged as aborted but isn't");
501 
502  /* Ensure we're still in pipeline */
504  pg_fatal("Fell out of pipeline mode somehow");
505 
506  /*
507  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
508  *
509  * (This is so clients know to start processing results normally again and
510  * can tell the difference between skipped commands and the sync.)
511  */
512  res = PQgetResult(conn);
513  if (res == NULL)
514  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
516  pg_fatal("Unexpected result code from first pipeline sync\n"
517  "Expected PGRES_PIPELINE_SYNC, got %s",
519  PQclear(res);
520 
522  pg_fatal("sync should've cleared the aborted flag but didn't");
523 
524  /* We're still in pipeline mode... */
526  pg_fatal("Fell out of pipeline mode somehow");
527 
528  /* the insert from the second pipeline */
529  res = PQgetResult(conn);
530  if (res == NULL)
531  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
533  pg_fatal("Unexpected result code %s from first item in second pipeline",
535  PQclear(res);
536 
537  /* Read the NULL result at the end of the command */
538  if ((res = PQgetResult(conn)) != NULL)
539  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
540 
541  /* the second pipeline sync */
542  if ((res = PQgetResult(conn)) == NULL)
543  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
545  pg_fatal("Unexpected result code %s from second pipeline sync",
547  PQclear(res);
548 
549  if ((res = PQgetResult(conn)) != NULL)
550  pg_fatal("Expected null result, got %s: %s",
553 
554  /* Try to send two queries in one command */
555  if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
556  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
557  if (PQpipelineSync(conn) != 1)
558  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
559  goterror = false;
560  while ((res = PQgetResult(conn)) != NULL)
561  {
562  switch (PQresultStatus(res))
563  {
564  case PGRES_FATAL_ERROR:
565  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
566  pg_fatal("expected error about multiple commands, got %s",
568  printf("got expected %s", PQerrorMessage(conn));
569  goterror = true;
570  break;
571  default:
572  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
573  break;
574  }
575  }
576  if (!goterror)
577  pg_fatal("did not get cannot-insert-multiple-commands error");
578  res = PQgetResult(conn);
579  if (res == NULL)
580  pg_fatal("got NULL result");
582  pg_fatal("Unexpected result code %s from pipeline sync",
584  fprintf(stderr, "ok\n");
585 
586  /* Test single-row mode with an error partways */
587  if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
588  0, NULL, NULL, NULL, NULL, 0) != 1)
589  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
590  if (PQpipelineSync(conn) != 1)
591  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
593  goterror = false;
594  gotrows = 0;
595  while ((res = PQgetResult(conn)) != NULL)
596  {
597  switch (PQresultStatus(res))
598  {
599  case PGRES_SINGLE_TUPLE:
600  printf("got row: %s\n", PQgetvalue(res, 0, 0));
601  gotrows++;
602  break;
603  case PGRES_FATAL_ERROR:
604  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
605  pg_fatal("expected division-by-zero, got: %s (%s)",
608  printf("got expected division-by-zero\n");
609  goterror = true;
610  break;
611  default:
612  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
613  }
614  PQclear(res);
615  }
616  if (!goterror)
617  pg_fatal("did not get division-by-zero error");
618  if (gotrows != 3)
619  pg_fatal("did not get three rows");
620  /* the third pipeline sync */
621  if ((res = PQgetResult(conn)) == NULL)
622  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
624  pg_fatal("Unexpected result code %s from third pipeline sync",
626  PQclear(res);
627 
628  /* We're still in pipeline mode... */
630  pg_fatal("Fell out of pipeline mode somehow");
631 
632  /* until we end it, which we can safely do now */
633  if (PQexitPipelineMode(conn) != 1)
634  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
636 
638  pg_fatal("exiting pipeline mode didn't seem to work");
639 
640  /*-
641  * Since we fired the pipelines off without a surrounding xact, the results
642  * should be:
643  *
644  * - Implicit xact started by server around 1st pipeline
645  * - First insert applied
646  * - Second statement aborted xact
647  * - Third insert skipped
648  * - Sync rolled back first implicit xact
649  * - Implicit xact created by server around 2nd pipeline
650  * - insert applied from 2nd pipeline
651  * - Sync commits 2nd xact
652  *
653  * So we should only have the value 3 that we inserted.
654  */
655  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
656 
658  pg_fatal("Expected tuples, got %s: %s",
660  if (PQntuples(res) != 1)
661  pg_fatal("expected 1 result, got %d", PQntuples(res));
662  for (i = 0; i < PQntuples(res); i++)
663  {
664  const char *val = PQgetvalue(res, i, 0);
665 
666  if (strcmp(val, "3") != 0)
667  pg_fatal("expected only insert with value 3, got %s", val);
668  }
669 
670  PQclear(res);
671 
672  fprintf(stderr, "ok\n");
673 }
674 
675 /* State machine enum for test_pipelined_insert */
677 {
686 };
687 
688 static void
690 {
691  Oid insert_param_oids[2] = {INT4OID, INT8OID};
692  const char *insert_params[2];
693  char insert_param_0[MAXINTLEN];
694  char insert_param_1[MAXINT8LEN];
695  enum PipelineInsertStep send_step = BI_BEGIN_TX,
696  recv_step = BI_BEGIN_TX;
697  int rows_to_send,
698  rows_to_receive;
699 
700  insert_params[0] = insert_param_0;
701  insert_params[1] = insert_param_1;
702 
703  rows_to_send = rows_to_receive = n_rows;
704 
705  /*
706  * Do a pipelined insert into a table created at the start of the pipeline
707  */
708  if (PQenterPipelineMode(conn) != 1)
709  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
710 
711  while (send_step != BI_PREPARE)
712  {
713  const char *sql;
714 
715  switch (send_step)
716  {
717  case BI_BEGIN_TX:
718  sql = "BEGIN TRANSACTION";
719  send_step = BI_DROP_TABLE;
720  break;
721 
722  case BI_DROP_TABLE:
723  sql = drop_table_sql;
724  send_step = BI_CREATE_TABLE;
725  break;
726 
727  case BI_CREATE_TABLE:
728  sql = create_table_sql;
729  send_step = BI_PREPARE;
730  break;
731 
732  default:
733  pg_fatal("invalid state");
734  sql = NULL; /* keep compiler quiet */
735  }
736 
737  pg_debug("sending: %s\n", sql);
738  if (PQsendQueryParams(conn, sql,
739  0, NULL, NULL, NULL, NULL, 0) != 1)
740  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
741  }
742 
743  Assert(send_step == BI_PREPARE);
744  pg_debug("sending: %s\n", insert_sql2);
745  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
746  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
747  send_step = BI_INSERT_ROWS;
748 
749  /*
750  * Now we start inserting. We'll be sending enough data that we could fill
751  * our output buffer, so to avoid deadlocking we need to enter nonblocking
752  * mode and consume input while we send more output. As results of each
753  * query are processed we should pop them to allow processing of the next
754  * query. There's no need to finish the pipeline before processing
755  * results.
756  */
757  if (PQsetnonblocking(conn, 1) != 0)
758  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
759 
760  while (recv_step != BI_DONE)
761  {
762  int sock;
763  fd_set input_mask;
764  fd_set output_mask;
765 
766  sock = PQsocket(conn);
767 
768  if (sock < 0)
769  break; /* shouldn't happen */
770 
771  FD_ZERO(&input_mask);
772  FD_SET(sock, &input_mask);
773  FD_ZERO(&output_mask);
774  FD_SET(sock, &output_mask);
775 
776  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
777  {
778  fprintf(stderr, "select() failed: %s\n", strerror(errno));
779  exit_nicely(conn);
780  }
781 
782  /*
783  * Process any results, so we keep the server's output buffer free
784  * flowing and it can continue to process input
785  */
786  if (FD_ISSET(sock, &input_mask))
787  {
789 
790  /* Read until we'd block if we tried to read */
791  while (!PQisBusy(conn) && recv_step < BI_DONE)
792  {
793  PGresult *res;
794  const char *cmdtag = "";
795  const char *description = "";
796  int status;
797 
798  /*
799  * Read next result. If no more results from this query,
800  * advance to the next query
801  */
802  res = PQgetResult(conn);
803  if (res == NULL)
804  continue;
805 
806  status = PGRES_COMMAND_OK;
807  switch (recv_step)
808  {
809  case BI_BEGIN_TX:
810  cmdtag = "BEGIN";
811  recv_step++;
812  break;
813  case BI_DROP_TABLE:
814  cmdtag = "DROP TABLE";
815  recv_step++;
816  break;
817  case BI_CREATE_TABLE:
818  cmdtag = "CREATE TABLE";
819  recv_step++;
820  break;
821  case BI_PREPARE:
822  cmdtag = "";
823  description = "PREPARE";
824  recv_step++;
825  break;
826  case BI_INSERT_ROWS:
827  cmdtag = "INSERT";
828  rows_to_receive--;
829  if (rows_to_receive == 0)
830  recv_step++;
831  break;
832  case BI_COMMIT_TX:
833  cmdtag = "COMMIT";
834  recv_step++;
835  break;
836  case BI_SYNC:
837  cmdtag = "";
838  description = "SYNC";
839  status = PGRES_PIPELINE_SYNC;
840  recv_step++;
841  break;
842  case BI_DONE:
843  /* unreachable */
844  pg_fatal("unreachable state");
845  }
846 
847  if (PQresultStatus(res) != status)
848  pg_fatal("%s reported status %s, expected %s\n"
849  "Error message: \"%s\"",
851  PQresStatus(status), PQerrorMessage(conn));
852 
853  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
854  pg_fatal("%s expected command tag '%s', got '%s'",
855  description, cmdtag, PQcmdStatus(res));
856 
857  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
858 
859  PQclear(res);
860  }
861  }
862 
863  /* Write more rows and/or the end pipeline message, if needed */
864  if (FD_ISSET(sock, &output_mask))
865  {
866  PQflush(conn);
867 
868  if (send_step == BI_INSERT_ROWS)
869  {
870  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
871  /* use up some buffer space with a wide value */
872  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
873 
874  if (PQsendQueryPrepared(conn, "my_insert",
875  2, insert_params, NULL, NULL, 0) == 1)
876  {
877  pg_debug("sent row %d\n", rows_to_send);
878 
879  rows_to_send--;
880  if (rows_to_send == 0)
881  send_step++;
882  }
883  else
884  {
885  /*
886  * in nonblocking mode, so it's OK for an insert to fail
887  * to send
888  */
889  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
890  rows_to_send, PQerrorMessage(conn));
891  }
892  }
893  else if (send_step == BI_COMMIT_TX)
894  {
895  if (PQsendQueryParams(conn, "COMMIT",
896  0, NULL, NULL, NULL, NULL, 0) == 1)
897  {
898  pg_debug("sent COMMIT\n");
899  send_step++;
900  }
901  else
902  {
903  fprintf(stderr, "WARNING: failed to send commit: %s\n",
905  }
906  }
907  else if (send_step == BI_SYNC)
908  {
909  if (PQpipelineSync(conn) == 1)
910  {
911  fprintf(stdout, "pipeline sync sent\n");
912  send_step++;
913  }
914  else
915  {
916  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
918  }
919  }
920  }
921  }
922 
923  /* We've got the sync message and the pipeline should be done */
924  if (PQexitPipelineMode(conn) != 1)
925  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
927 
928  if (PQsetnonblocking(conn, 0) != 0)
929  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
930 
931  fprintf(stderr, "ok\n");
932 }
933 
934 static void
936 {
937  PGresult *res = NULL;
938  Oid param_oids[1] = {INT4OID};
939  Oid expected_oids[4];
940  Oid typ;
941 
942  fprintf(stderr, "prepared... ");
943 
944  if (PQenterPipelineMode(conn) != 1)
945  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
946  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
947  "interval '1 sec'",
948  1, param_oids) != 1)
949  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
950  expected_oids[0] = INT4OID;
951  expected_oids[1] = TEXTOID;
952  expected_oids[2] = NUMERICOID;
953  expected_oids[3] = INTERVALOID;
954  if (PQsendDescribePrepared(conn, "select_one") != 1)
955  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
956  if (PQpipelineSync(conn) != 1)
957  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
958 
959  res = PQgetResult(conn);
960  if (res == NULL)
961  pg_fatal("PQgetResult returned null");
963  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
964  PQclear(res);
965  res = PQgetResult(conn);
966  if (res != NULL)
967  pg_fatal("expected NULL result");
968 
969  res = PQgetResult(conn);
970  if (res == NULL)
971  pg_fatal("PQgetResult returned NULL");
973  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
974  if (PQnfields(res) != lengthof(expected_oids))
975  pg_fatal("expected %zu columns, got %d",
976  lengthof(expected_oids), PQnfields(res));
977  for (int i = 0; i < PQnfields(res); i++)
978  {
979  typ = PQftype(res, i);
980  if (typ != expected_oids[i])
981  pg_fatal("field %d: expected type %u, got %u",
982  i, expected_oids[i], typ);
983  }
984  PQclear(res);
985  res = PQgetResult(conn);
986  if (res != NULL)
987  pg_fatal("expected NULL result");
988 
989  res = PQgetResult(conn);
991  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
992 
993  fprintf(stderr, "closing statement..");
994  if (PQsendClosePrepared(conn, "select_one") != 1)
995  pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
996  if (PQpipelineSync(conn) != 1)
997  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
998 
999  res = PQgetResult(conn);
1000  if (res == NULL)
1001  pg_fatal("expected non-NULL result");
1003  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1004  PQclear(res);
1005  res = PQgetResult(conn);
1006  if (res != NULL)
1007  pg_fatal("expected NULL result");
1008  res = PQgetResult(conn);
1010  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1011 
1012  if (PQexitPipelineMode(conn) != 1)
1013  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1014 
1015  /* Now that it's closed we should get an error when describing */
1016  res = PQdescribePrepared(conn, "select_one");
1018  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1019 
1020  /*
1021  * Also test the blocking close, this should not fail since closing a
1022  * non-existent prepared statement is a no-op
1023  */
1024  res = PQclosePrepared(conn, "select_one");
1026  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1027 
1028  fprintf(stderr, "creating portal... ");
1029  PQexec(conn, "BEGIN");
1030  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1032  if (PQsendDescribePortal(conn, "cursor_one") != 1)
1033  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1034  if (PQpipelineSync(conn) != 1)
1035  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1036  res = PQgetResult(conn);
1037  if (res == NULL)
1038  pg_fatal("PQgetResult returned null");
1040  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1041 
1042  typ = PQftype(res, 0);
1043  if (typ != INT4OID)
1044  pg_fatal("portal: expected type %u, got %u",
1045  INT4OID, typ);
1046  PQclear(res);
1047  res = PQgetResult(conn);
1048  if (res != NULL)
1049  pg_fatal("expected NULL result");
1050  res = PQgetResult(conn);
1052  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1053 
1054  fprintf(stderr, "closing portal... ");
1055  if (PQsendClosePortal(conn, "cursor_one") != 1)
1056  pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1057  if (PQpipelineSync(conn) != 1)
1058  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1059 
1060  res = PQgetResult(conn);
1061  if (res == NULL)
1062  pg_fatal("expected non-NULL result");
1064  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1065  PQclear(res);
1066  res = PQgetResult(conn);
1067  if (res != NULL)
1068  pg_fatal("expected NULL result");
1069  res = PQgetResult(conn);
1071  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1072 
1073  if (PQexitPipelineMode(conn) != 1)
1074  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1075 
1076  /* Now that it's closed we should get an error when describing */
1077  res = PQdescribePortal(conn, "cursor_one");
1079  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1080 
1081  /*
1082  * Also test the blocking close, this should not fail since closing a
1083  * non-existent portal is a no-op
1084  */
1085  res = PQclosePortal(conn, "cursor_one");
1087  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1088 
1089  fprintf(stderr, "ok\n");
1090 }
1091 
1092 /* Notice processor: print notices, and count how many we got */
1093 static void
1094 notice_processor(void *arg, const char *message)
1095 {
1096  int *n_notices = (int *) arg;
1097 
1098  (*n_notices)++;
1099  fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1100 }
1101 
1102 /* Verify behavior in "idle" state */
1103 static void
1105 {
1106  PGresult *res;
1107  int n_notices = 0;
1108 
1109  fprintf(stderr, "\npipeline idle...\n");
1110 
1112 
1113  /* Try to exit pipeline mode in pipeline-idle state */
1114  if (PQenterPipelineMode(conn) != 1)
1115  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1116  if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1117  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1119  res = PQgetResult(conn);
1120  if (res == NULL)
1121  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1122  PQerrorMessage(conn));
1124  pg_fatal("unexpected result code %s from first pipeline item",
1126  PQclear(res);
1127  res = PQgetResult(conn);
1128  if (res != NULL)
1129  pg_fatal("did not receive terminating NULL");
1130  if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1131  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1132  if (PQexitPipelineMode(conn) == 1)
1133  pg_fatal("exiting pipeline succeeded when it shouldn't");
1134  if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1135  strlen("cannot exit pipeline mode")) != 0)
1136  pg_fatal("did not get expected error; got: %s",
1137  PQerrorMessage(conn));
1139  res = PQgetResult(conn);
1141  pg_fatal("unexpected result code %s from second pipeline item",
1143  PQclear(res);
1144  res = PQgetResult(conn);
1145  if (res != NULL)
1146  pg_fatal("did not receive terminating NULL");
1147  if (PQexitPipelineMode(conn) != 1)
1148  pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1149 
1150  if (n_notices > 0)
1151  pg_fatal("got %d notice(s)", n_notices);
1152  fprintf(stderr, "ok - 1\n");
1153 
1154  /* Have a WARNING in the middle of a resultset */
1155  if (PQenterPipelineMode(conn) != 1)
1156  pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1157  if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1158  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1160  res = PQgetResult(conn);
1161  if (res == NULL)
1162  pg_fatal("unexpected NULL result received");
1164  pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1165  if (PQexitPipelineMode(conn) != 1)
1166  pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1167  fprintf(stderr, "ok - 2\n");
1168 }
1169 
1170 static void
1172 {
1173  PGresult *res = NULL;
1174  const char *dummy_params[1] = {"1"};
1175  Oid dummy_param_oids[1] = {INT4OID};
1176 
1177  fprintf(stderr, "simple pipeline... ");
1178 
1179  /*
1180  * Enter pipeline mode and dispatch a set of operations, which we'll then
1181  * process the results of as they come in.
1182  *
1183  * For a simple case we should be able to do this without interim
1184  * processing of results since our output buffer will give us enough slush
1185  * to work with and we won't block on sending. So blocking mode is fine.
1186  */
1187  if (PQisnonblocking(conn))
1188  pg_fatal("Expected blocking connection mode");
1189 
1190  if (PQenterPipelineMode(conn) != 1)
1191  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1192 
1193  if (PQsendQueryParams(conn, "SELECT $1",
1194  1, dummy_param_oids, dummy_params,
1195  NULL, NULL, 0) != 1)
1196  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1197 
1198  if (PQexitPipelineMode(conn) != 0)
1199  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1200 
1201  if (PQpipelineSync(conn) != 1)
1202  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1203 
1204  res = PQgetResult(conn);
1205  if (res == NULL)
1206  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1207  PQerrorMessage(conn));
1208 
1210  pg_fatal("Unexpected result code %s from first pipeline item",
1212 
1213  PQclear(res);
1214  res = NULL;
1215 
1216  if (PQgetResult(conn) != NULL)
1217  pg_fatal("PQgetResult returned something extra after first query result.");
1218 
1219  /*
1220  * Even though we've processed the result there's still a sync to come and
1221  * we can't exit pipeline mode yet
1222  */
1223  if (PQexitPipelineMode(conn) != 0)
1224  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1225 
1226  res = PQgetResult(conn);
1227  if (res == NULL)
1228  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1229  PQerrorMessage(conn));
1230 
1232  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1234 
1235  PQclear(res);
1236  res = NULL;
1237 
1238  if (PQgetResult(conn) != NULL)
1239  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1241 
1242  /* We're still in pipeline mode... */
1244  pg_fatal("Fell out of pipeline mode somehow");
1245 
1246  /* ... until we end it, which we can safely do now */
1247  if (PQexitPipelineMode(conn) != 1)
1248  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1249  PQerrorMessage(conn));
1250 
1252  pg_fatal("Exiting pipeline mode didn't seem to work");
1253 
1254  fprintf(stderr, "ok\n");
1255 }
1256 
1257 static void
1259 {
1260  PGresult *res;
1261  int i;
1262  bool pipeline_ended = false;
1263 
1264  if (PQenterPipelineMode(conn) != 1)
1265  pg_fatal("failed to enter pipeline mode: %s",
1266  PQerrorMessage(conn));
1267 
1268  /* One series of three commands, using single-row mode for the first two. */
1269  for (i = 0; i < 3; i++)
1270  {
1271  char *param[1];
1272 
1273  param[0] = psprintf("%d", 44 + i);
1274 
1275  if (PQsendQueryParams(conn,
1276  "SELECT generate_series(42, $1)",
1277  1,
1278  NULL,
1279  (const char **) param,
1280  NULL,
1281  NULL,
1282  0) != 1)
1283  pg_fatal("failed to send query: %s",
1284  PQerrorMessage(conn));
1285  pfree(param[0]);
1286  }
1287  if (PQpipelineSync(conn) != 1)
1288  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1289 
1290  for (i = 0; !pipeline_ended; i++)
1291  {
1292  bool first = true;
1293  bool saw_ending_tuplesok;
1294  bool isSingleTuple = false;
1295 
1296  /* Set single row mode for only first 2 SELECT queries */
1297  if (i < 2)
1298  {
1299  if (PQsetSingleRowMode(conn) != 1)
1300  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1301  }
1302 
1303  /* Consume rows for this query */
1304  saw_ending_tuplesok = false;
1305  while ((res = PQgetResult(conn)) != NULL)
1306  {
1308 
1309  if (est == PGRES_PIPELINE_SYNC)
1310  {
1311  fprintf(stderr, "end of pipeline reached\n");
1312  pipeline_ended = true;
1313  PQclear(res);
1314  if (i != 3)
1315  pg_fatal("Expected three results, got %d", i);
1316  break;
1317  }
1318 
1319  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1320  if (first)
1321  {
1322  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1323  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1324  i, PQresStatus(est));
1325  if (i >= 2 && est != PGRES_TUPLES_OK)
1326  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1327  i, PQresStatus(est));
1328  first = false;
1329  }
1330 
1331  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1332  switch (est)
1333  {
1334  case PGRES_TUPLES_OK:
1335  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1336  saw_ending_tuplesok = true;
1337  if (isSingleTuple)
1338  {
1339  if (PQntuples(res) == 0)
1340  fprintf(stderr, "all tuples received in query %d\n", i);
1341  else
1342  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1343  }
1344  break;
1345 
1346  case PGRES_SINGLE_TUPLE:
1347  isSingleTuple = true;
1348  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1349  break;
1350 
1351  default:
1352  pg_fatal("unexpected");
1353  }
1354  PQclear(res);
1355  }
1356  if (!pipeline_ended && !saw_ending_tuplesok)
1357  pg_fatal("didn't get expected terminating TUPLES_OK");
1358  }
1359 
1360  /*
1361  * Now issue one command, get its results in with single-row mode, then
1362  * issue another command, and get its results in normal mode; make sure
1363  * the single-row mode flag is reset as expected.
1364  */
1365  if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1366  0, NULL, NULL, NULL, NULL, 0) != 1)
1367  pg_fatal("failed to send query: %s",
1368  PQerrorMessage(conn));
1369  if (PQsendFlushRequest(conn) != 1)
1370  pg_fatal("failed to send flush request");
1371  if (PQsetSingleRowMode(conn) != 1)
1372  pg_fatal("PQsetSingleRowMode() failed");
1373  res = PQgetResult(conn);
1374  if (res == NULL)
1375  pg_fatal("unexpected NULL");
1377  pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1379  res = PQgetResult(conn);
1380  if (res == NULL)
1381  pg_fatal("unexpected NULL");
1383  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1385  if (PQgetResult(conn) != NULL)
1386  pg_fatal("expected NULL result");
1387 
1388  if (PQsendQueryParams(conn, "SELECT 1",
1389  0, NULL, NULL, NULL, NULL, 0) != 1)
1390  pg_fatal("failed to send query: %s",
1391  PQerrorMessage(conn));
1392  if (PQsendFlushRequest(conn) != 1)
1393  pg_fatal("failed to send flush request");
1394  res = PQgetResult(conn);
1395  if (res == NULL)
1396  pg_fatal("unexpected NULL");
1398  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1400  if (PQgetResult(conn) != NULL)
1401  pg_fatal("expected NULL result");
1402 
1403  if (PQexitPipelineMode(conn) != 1)
1404  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1405 
1406  fprintf(stderr, "ok\n");
1407 }
1408 
1409 /*
1410  * Simple test to verify that a pipeline is discarded as a whole when there's
1411  * an error, ignoring transaction commands.
1412  */
1413 static void
1415 {
1416  PGresult *res;
1417  bool expect_null;
1418  int num_syncs = 0;
1419 
1420  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1421  "CREATE TABLE pq_pipeline_tst (id int)");
1423  pg_fatal("failed to create test table: %s",
1424  PQerrorMessage(conn));
1425  PQclear(res);
1426 
1427  if (PQenterPipelineMode(conn) != 1)
1428  pg_fatal("failed to enter pipeline mode: %s",
1429  PQerrorMessage(conn));
1430  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1431  pg_fatal("could not send prepare on pipeline: %s",
1432  PQerrorMessage(conn));
1433 
1434  if (PQsendQueryParams(conn,
1435  "BEGIN",
1436  0, NULL, NULL, NULL, NULL, 0) != 1)
1437  pg_fatal("failed to send query: %s",
1438  PQerrorMessage(conn));
1439  if (PQsendQueryParams(conn,
1440  "SELECT 0/0",
1441  0, NULL, NULL, NULL, NULL, 0) != 1)
1442  pg_fatal("failed to send query: %s",
1443  PQerrorMessage(conn));
1444 
1445  /*
1446  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1447  * get out of the pipeline-aborted state first.
1448  */
1449  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1450  pg_fatal("failed to execute prepared: %s",
1451  PQerrorMessage(conn));
1452 
1453  /* This insert fails because we're in pipeline-aborted state */
1454  if (PQsendQueryParams(conn,
1455  "INSERT INTO pq_pipeline_tst VALUES (1)",
1456  0, NULL, NULL, NULL, NULL, 0) != 1)
1457  pg_fatal("failed to send query: %s",
1458  PQerrorMessage(conn));
1459  if (PQpipelineSync(conn) != 1)
1460  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1461  num_syncs++;
1462 
1463  /*
1464  * This insert fails even though the pipeline got a SYNC, because we're in
1465  * an aborted transaction
1466  */
1467  if (PQsendQueryParams(conn,
1468  "INSERT INTO pq_pipeline_tst VALUES (2)",
1469  0, NULL, NULL, NULL, NULL, 0) != 1)
1470  pg_fatal("failed to send query: %s",
1471  PQerrorMessage(conn));
1472  if (PQpipelineSync(conn) != 1)
1473  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1474  num_syncs++;
1475 
1476  /*
1477  * Send ROLLBACK using prepared stmt. This one works because we just did
1478  * PQpipelineSync above.
1479  */
1480  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1481  pg_fatal("failed to execute prepared: %s",
1482  PQerrorMessage(conn));
1483 
1484  /*
1485  * Now that we're out of a transaction and in pipeline-good mode, this
1486  * insert works
1487  */
1488  if (PQsendQueryParams(conn,
1489  "INSERT INTO pq_pipeline_tst VALUES (3)",
1490  0, NULL, NULL, NULL, NULL, 0) != 1)
1491  pg_fatal("failed to send query: %s",
1492  PQerrorMessage(conn));
1493  /* Send two syncs now -- match up to SYNC messages below */
1494  if (PQpipelineSync(conn) != 1)
1495  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1496  num_syncs++;
1497  if (PQpipelineSync(conn) != 1)
1498  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1499  num_syncs++;
1500 
1501  expect_null = false;
1502  for (int i = 0;; i++)
1503  {
1504  ExecStatusType restype;
1505 
1506  res = PQgetResult(conn);
1507  if (res == NULL)
1508  {
1509  printf("%d: got NULL result\n", i);
1510  if (!expect_null)
1511  pg_fatal("did not expect NULL here");
1512  expect_null = false;
1513  continue;
1514  }
1515  restype = PQresultStatus(res);
1516  printf("%d: got status %s", i, PQresStatus(restype));
1517  if (expect_null)
1518  pg_fatal("expected NULL");
1519  if (restype == PGRES_FATAL_ERROR)
1520  printf("; error: %s", PQerrorMessage(conn));
1521  else if (restype == PGRES_PIPELINE_ABORTED)
1522  {
1523  printf(": command didn't run because pipeline aborted\n");
1524  }
1525  else
1526  printf("\n");
1527  PQclear(res);
1528 
1529  if (restype == PGRES_PIPELINE_SYNC)
1530  num_syncs--;
1531  else
1532  expect_null = true;
1533  if (num_syncs <= 0)
1534  break;
1535  }
1536  if (PQgetResult(conn) != NULL)
1537  pg_fatal("returned something extra after all the syncs: %s",
1539 
1540  if (PQexitPipelineMode(conn) != 1)
1541  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1542 
1543  /* We expect to find one tuple containing the value "3" */
1544  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1546  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1547  if (PQntuples(res) != 1)
1548  pg_fatal("did not get 1 tuple");
1549  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1550  pg_fatal("did not get expected tuple");
1551  PQclear(res);
1552 
1553  fprintf(stderr, "ok\n");
1554 }
1555 
1556 /*
1557  * In this test mode we send a stream of queries, with one in the middle
1558  * causing an error. Verify that we can still send some more after the
1559  * error and have libpq work properly.
1560  */
1561 static void
1563 {
1564  int sock = PQsocket(conn);
1565  PGresult *res;
1566  Oid paramTypes[2] = {INT8OID, INT8OID};
1567  const char *paramValues[2];
1568  char paramValue0[MAXINT8LEN];
1569  char paramValue1[MAXINT8LEN];
1570  int ctr = 0;
1571  int numsent = 0;
1572  int results = 0;
1573  bool read_done = false;
1574  bool write_done = false;
1575  bool error_sent = false;
1576  bool got_error = false;
1577  int switched = 0;
1578  int socketful = 0;
1579  fd_set in_fds;
1580  fd_set out_fds;
1581 
1582  fprintf(stderr, "uniqviol ...");
1583 
1584  PQsetnonblocking(conn, 1);
1585 
1586  paramValues[0] = paramValue0;
1587  paramValues[1] = paramValue1;
1588  sprintf(paramValue1, "42");
1589 
1590  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1591  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1593  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1594 
1595  res = PQexec(conn, "begin");
1597  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1598 
1599  res = PQprepare(conn, "insertion",
1600  "insert into ppln_uniqviol values ($1, $2) returning id",
1601  2, paramTypes);
1602  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1603  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1604 
1605  if (PQenterPipelineMode(conn) != 1)
1606  pg_fatal("failed to enter pipeline mode");
1607 
1608  while (!read_done)
1609  {
1610  /*
1611  * Avoid deadlocks by reading everything the server has sent before
1612  * sending anything. (Special precaution is needed here to process
1613  * PQisBusy before testing the socket for read-readiness, because the
1614  * socket does not turn read-ready after "sending" queries in aborted
1615  * pipeline mode.)
1616  */
1617  while (PQisBusy(conn) == 0)
1618  {
1619  bool new_error;
1620 
1621  if (results >= numsent)
1622  {
1623  if (write_done)
1624  read_done = true;
1625  break;
1626  }
1627 
1628  res = PQgetResult(conn);
1629  new_error = process_result(conn, res, results, numsent);
1630  if (new_error && got_error)
1631  pg_fatal("got two errors");
1632  got_error |= new_error;
1633  if (results++ >= numsent - 1)
1634  {
1635  if (write_done)
1636  read_done = true;
1637  break;
1638  }
1639  }
1640 
1641  if (read_done)
1642  break;
1643 
1644  FD_ZERO(&out_fds);
1645  FD_SET(sock, &out_fds);
1646 
1647  FD_ZERO(&in_fds);
1648  FD_SET(sock, &in_fds);
1649 
1650  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1651  {
1652  if (errno == EINTR)
1653  continue;
1654  pg_fatal("select() failed: %m");
1655  }
1656 
1657  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1658  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1659 
1660  /*
1661  * If the socket is writable and we haven't finished sending queries,
1662  * send some.
1663  */
1664  if (!write_done && FD_ISSET(sock, &out_fds))
1665  {
1666  for (;;)
1667  {
1668  int flush;
1669 
1670  /*
1671  * provoke uniqueness violation exactly once after having
1672  * switched to read mode.
1673  */
1674  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1675  {
1676  sprintf(paramValue0, "%d", numsent / 2);
1677  fprintf(stderr, "E");
1678  error_sent = true;
1679  }
1680  else
1681  {
1682  fprintf(stderr, ".");
1683  sprintf(paramValue0, "%d", ctr++);
1684  }
1685 
1686  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1687  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1688  numsent++;
1689 
1690  /* Are we done writing? */
1691  if (socketful != 0 && numsent % socketful == 42 && error_sent)
1692  {
1693  if (PQsendFlushRequest(conn) != 1)
1694  pg_fatal("failed to send flush request");
1695  write_done = true;
1696  fprintf(stderr, "\ndone writing\n");
1697  PQflush(conn);
1698  break;
1699  }
1700 
1701  /* is the outgoing socket full? */
1702  flush = PQflush(conn);
1703  if (flush == -1)
1704  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1705  if (flush == 1)
1706  {
1707  if (socketful == 0)
1708  socketful = numsent;
1709  fprintf(stderr, "\nswitch to reading\n");
1710  switched++;
1711  break;
1712  }
1713  }
1714  }
1715  }
1716 
1717  if (!got_error)
1718  pg_fatal("did not get expected error");
1719 
1720  fprintf(stderr, "ok\n");
1721 }
1722 
1723 /*
1724  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1725  * the expected NULL that should follow it.
1726  *
1727  * Returns true if we read a fatal error message, otherwise false.
1728  */
1729 static bool
1730 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1731 {
1732  PGresult *res2;
1733  bool got_error = false;
1734 
1735  if (res == NULL)
1736  pg_fatal("got unexpected NULL");
1737 
1738  switch (PQresultStatus(res))
1739  {
1740  case PGRES_FATAL_ERROR:
1741  got_error = true;
1742  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1743  PQclear(res);
1744 
1745  res2 = PQgetResult(conn);
1746  if (res2 != NULL)
1747  pg_fatal("expected NULL, got %s",
1748  PQresStatus(PQresultStatus(res2)));
1749  break;
1750 
1751  case PGRES_TUPLES_OK:
1752  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1753  PQclear(res);
1754 
1755  res2 = PQgetResult(conn);
1756  if (res2 != NULL)
1757  pg_fatal("expected NULL, got %s",
1758  PQresStatus(PQresultStatus(res2)));
1759  break;
1760 
1762  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1763  res2 = PQgetResult(conn);
1764  if (res2 != NULL)
1765  pg_fatal("expected NULL, got %s",
1766  PQresStatus(PQresultStatus(res2)));
1767  break;
1768 
1769  default:
1770  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1771  }
1772 
1773  return got_error;
1774 }
1775 
1776 
1777 static void
1778 usage(const char *progname)
1779 {
1780  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1781  fprintf(stderr, "Usage:\n");
1782  fprintf(stderr, " %s [OPTION] tests\n", progname);
1783  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1784  fprintf(stderr, "\nOptions:\n");
1785  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1786  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1787 }
1788 
1789 static void
1791 {
1792  printf("disallowed_in_pipeline\n");
1793  printf("multi_pipelines\n");
1794  printf("nosync\n");
1795  printf("pipeline_abort\n");
1796  printf("pipeline_idle\n");
1797  printf("pipelined_insert\n");
1798  printf("prepared\n");
1799  printf("simple_pipeline\n");
1800  printf("singlerow\n");
1801  printf("transaction\n");
1802  printf("uniqviol\n");
1803 }
1804 
1805 int
1806 main(int argc, char **argv)
1807 {
1808  const char *conninfo = "";
1809  PGconn *conn;
1810  FILE *trace;
1811  char *testname;
1812  int numrows = 10000;
1813  PGresult *res;
1814  int c;
1815 
1816  while ((c = getopt(argc, argv, "r:t:")) != -1)
1817  {
1818  switch (c)
1819  {
1820  case 'r': /* numrows */
1821  errno = 0;
1822  numrows = strtol(optarg, NULL, 10);
1823  if (errno != 0 || numrows <= 0)
1824  {
1825  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1826  optarg);
1827  exit(1);
1828  }
1829  break;
1830  case 't': /* trace file */
1832  break;
1833  }
1834  }
1835 
1836  if (optind < argc)
1837  {
1838  testname = pg_strdup(argv[optind]);
1839  optind++;
1840  }
1841  else
1842  {
1843  usage(argv[0]);
1844  exit(1);
1845  }
1846 
1847  if (strcmp(testname, "tests") == 0)
1848  {
1849  print_test_list();
1850  exit(0);
1851  }
1852 
1853  if (optind < argc)
1854  {
1855  conninfo = pg_strdup(argv[optind]);
1856  optind++;
1857  }
1858 
1859  /* Make a connection to the database */
1860  conn = PQconnectdb(conninfo);
1861  if (PQstatus(conn) != CONNECTION_OK)
1862  {
1863  fprintf(stderr, "Connection to database failed: %s\n",
1864  PQerrorMessage(conn));
1865  exit_nicely(conn);
1866  }
1867 
1868  res = PQexec(conn, "SET lc_messages TO \"C\"");
1870  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1871  res = PQexec(conn, "SET debug_parallel_query = off");
1873  pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
1874 
1875  /* Set the trace file, if requested */
1876  if (tracefile != NULL)
1877  {
1878  if (strcmp(tracefile, "-") == 0)
1879  trace = stdout;
1880  else
1881  trace = fopen(tracefile, "w");
1882  if (trace == NULL)
1883  pg_fatal("could not open file \"%s\": %m", tracefile);
1884 
1885  /* Make it line-buffered */
1886  setvbuf(trace, NULL, PG_IOLBF, 0);
1887 
1888  PQtrace(conn, trace);
1891  }
1892 
1893  if (strcmp(testname, "disallowed_in_pipeline") == 0)
1895  else if (strcmp(testname, "multi_pipelines") == 0)
1897  else if (strcmp(testname, "nosync") == 0)
1898  test_nosync(conn);
1899  else if (strcmp(testname, "pipeline_abort") == 0)
1901  else if (strcmp(testname, "pipeline_idle") == 0)
1903  else if (strcmp(testname, "pipelined_insert") == 0)
1904  test_pipelined_insert(conn, numrows);
1905  else if (strcmp(testname, "prepared") == 0)
1907  else if (strcmp(testname, "simple_pipeline") == 0)
1909  else if (strcmp(testname, "singlerow") == 0)
1911  else if (strcmp(testname, "transaction") == 0)
1913  else if (strcmp(testname, "uniqviol") == 0)
1915  else
1916  {
1917  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1918  exit(1);
1919  }
1920 
1921  /* close the connection to the database and cleanup */
1922  PQfinish(conn);
1923  return 0;
1924 }
#define lengthof(array)
Definition: c.h:777
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6841
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6788
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4562
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:724
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6883
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7020
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6867
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1501
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1932
int PQflush(PGconn *conn)
Definition: fe-exec.c:3960
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2268
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3679
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3035
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2531
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3004
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2417
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2518
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3387
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2501
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3242
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3441
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2436
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2453
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2224
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1960
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3836
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3712
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3904
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1545
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1425
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3232
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3379
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2466
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2483
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2007
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1642
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3331
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3426
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3943
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3449
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2038
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
long val
Definition: informix.c:664
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:60
ExecStatusType
Definition: libpq-fe.h:95
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:108
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:110
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:111
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:112
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:100
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:415
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:158
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:160
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:417
static void usage(const char *progname)
static void const char * fmt
#define MAXINT8LEN
static void print_test_list(void)
static void const char pg_attribute_printf(2, 3)
static const char *const insert_sql2
static void const char fflush(stdout)
va_end(args)
static void exit_nicely(PGconn *conn)
#define MAXINTLEN
int main(int argc, char **argv)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
char * tracefile
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
vfprintf(stderr, fmt, args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
exit(1)
static const char *const insert_sql
#define pg_debug(...)
PipelineInsertStep
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX
static void pg_attribute_noreturn() pg_fatal_impl(int line
static void test_nosync(PGconn *conn)
const char *const progname
static void test_pipeline_abort(PGconn *conn)
static const char *const drop_table_sql
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
#define pg_fatal(...)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
va_start(args, fmt)
void pfree(void *pointer)
Definition: mcxt.c:1431
void * arg
PGDLLIMPORT int optind
Definition: getopt.c:50
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
PGDLLIMPORT char * optarg
Definition: getopt.c:52
#define PG_IOLBF
Definition: port.h:361
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
#define printf(...)
Definition: port.h:244
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
PGconn * conn
Definition: streamutil.c:54
const char * description
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495