PostgreSQL Source Code  git master
libpq_pipeline.c File Reference
#include "postgres_fe.h"
#include <sys/time.h>
#include "catalog/pg_type_d.h"
#include "common/fe_memutils.h"
#include "libpq-fe.h"
#include "pg_getopt.h"
#include "portability/instr_time.h"
Include dependency graph for libpq_pipeline.c:

Go to the source code of this file.

Macros

#define pg_debug(...)
 
#define MAXINTLEN   12
 
#define MAXINT8LEN   20
 
#define pg_fatal(...)   pg_fatal_impl(__LINE__, __VA_ARGS__)
 

Enumerations

enum  PipelineInsertStep {
  BI_BEGIN_TX, BI_DROP_TABLE, BI_CREATE_TABLE, BI_PREPARE,
  BI_INSERT_ROWS, BI_COMMIT_TX, BI_SYNC, BI_DONE
}
 

Functions

static void exit_nicely (PGconn *conn)
 
static bool process_result (PGconn *conn, PGresult *res, int results, int numsent)
 
static void pg_attribute_noreturn ()
 
static void test_disallowed_in_pipeline (PGconn *conn)
 
static void test_multi_pipelines (PGconn *conn)
 
static void test_nosync (PGconn *conn)
 
static void test_pipeline_abort (PGconn *conn)
 
static void test_pipelined_insert (PGconn *conn, int n_rows)
 
static void test_prepared (PGconn *conn)
 
static void test_simple_pipeline (PGconn *conn)
 
static void test_singlerowmode (PGconn *conn)
 
static void test_transaction (PGconn *conn)
 
static void test_uniqviol (PGconn *conn)
 
static void usage (const char *progname)
 
static void print_test_list (void)
 
int main (int argc, char **argv)
 

Variables

const char *const progname = "libpq_pipeline"
 
char * tracefile = NULL
 
static const char *const drop_table_sql
 
static const char *const create_table_sql
 
static const char *const insert_sql
 
static const char *const insert_sql2
 

Macro Definition Documentation

◆ MAXINT8LEN

#define MAXINT8LEN   20

Definition at line 58 of file libpq_pipeline.c.

Referenced by test_pipelined_insert(), and test_uniqviol().

◆ MAXINTLEN

#define MAXINTLEN   12

Definition at line 57 of file libpq_pipeline.c.

Referenced by test_pipelined_insert().

◆ pg_debug

#define pg_debug (   ...)

Definition at line 43 of file libpq_pipeline.c.

Referenced by test_pipelined_insert().

◆ pg_fatal

Enumeration Type Documentation

◆ PipelineInsertStep

Enumerator
BI_BEGIN_TX 
BI_DROP_TABLE 
BI_CREATE_TABLE 
BI_PREPARE 
BI_INSERT_ROWS 
BI_COMMIT_TX 
BI_SYNC 
BI_DONE 

Definition at line 620 of file libpq_pipeline.c.

Function Documentation

◆ exit_nicely()

static void exit_nicely ( PGconn conn)
static

Definition at line 61 of file libpq_pipeline.c.

References PQfinish().

Referenced by main(), test_nosync(), and test_pipelined_insert().

62 {
63  PQfinish(conn);
64  exit(1);
65 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4231

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 1561 of file libpq_pipeline.c.

References conn, CONNECTION_OK, exit_nicely(), fprintf, getopt(), optarg, optind, pg_fatal, PG_IOLBF, pg_strdup(), PGRES_COMMAND_OK, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultStatus(), PQsetTraceFlags(), PQstatus(), PQtrace(), PQTRACE_REGRESS_MODE, PQTRACE_SUPPRESS_TIMESTAMPS, print_test_list(), test_disallowed_in_pipeline(), test_multi_pipelines(), test_nosync(), test_pipeline_abort(), test_pipelined_insert(), test_prepared(), test_simple_pipeline(), test_singlerowmode(), test_transaction(), test_uniqviol(), tracefile, and usage().

1562 {
1563  const char *conninfo = "";
1564  PGconn *conn;
1565  FILE *trace;
1566  char *testname;
1567  int numrows = 10000;
1568  PGresult *res;
1569  int c;
1570 
1571  while ((c = getopt(argc, argv, "t:r:")) != -1)
1572  {
1573  switch (c)
1574  {
1575  case 't': /* trace file */
1577  break;
1578  case 'r': /* numrows */
1579  errno = 0;
1580  numrows = strtol(optarg, NULL, 10);
1581  if (errno != 0 || numrows <= 0)
1582  {
1583  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1584  optarg);
1585  exit(1);
1586  }
1587  break;
1588  }
1589  }
1590 
1591  if (optind < argc)
1592  {
1593  testname = pg_strdup(argv[optind]);
1594  optind++;
1595  }
1596  else
1597  {
1598  usage(argv[0]);
1599  exit(1);
1600  }
1601 
1602  if (strcmp(testname, "tests") == 0)
1603  {
1604  print_test_list();
1605  exit(0);
1606  }
1607 
1608  if (optind < argc)
1609  {
1610  conninfo = pg_strdup(argv[optind]);
1611  optind++;
1612  }
1613 
1614  /* Make a connection to the database */
1615  conn = PQconnectdb(conninfo);
1616  if (PQstatus(conn) != CONNECTION_OK)
1617  {
1618  fprintf(stderr, "Connection to database failed: %s\n",
1619  PQerrorMessage(conn));
1620  exit_nicely(conn);
1621  }
1622 
1623  res = PQexec(conn, "SET lc_messages TO \"C\"");
1624  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1625  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1626  res = PQexec(conn, "SET force_parallel_mode = off");
1627  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1628  pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
1629 
1630  /* Set the trace file, if requested */
1631  if (tracefile != NULL)
1632  {
1633  trace = fopen(tracefile, "w");
1634  if (trace == NULL)
1635  pg_fatal("could not open file \"%s\": %m", tracefile);
1636 
1637  /* Make it line-buffered */
1638  setvbuf(trace, NULL, PG_IOLBF, 0);
1639 
1640  PQtrace(conn, trace);
1641  PQsetTraceFlags(conn,
1643  }
1644 
1645  if (strcmp(testname, "disallowed_in_pipeline") == 0)
1647  else if (strcmp(testname, "multi_pipelines") == 0)
1648  test_multi_pipelines(conn);
1649  else if (strcmp(testname, "nosync") == 0)
1650  test_nosync(conn);
1651  else if (strcmp(testname, "pipeline_abort") == 0)
1652  test_pipeline_abort(conn);
1653  else if (strcmp(testname, "pipelined_insert") == 0)
1654  test_pipelined_insert(conn, numrows);
1655  else if (strcmp(testname, "prepared") == 0)
1656  test_prepared(conn);
1657  else if (strcmp(testname, "simple_pipeline") == 0)
1658  test_simple_pipeline(conn);
1659  else if (strcmp(testname, "singlerow") == 0)
1660  test_singlerowmode(conn);
1661  else if (strcmp(testname, "transaction") == 0)
1662  test_transaction(conn);
1663  else if (strcmp(testname, "uniqviol") == 0)
1664  test_uniqviol(conn);
1665  else
1666  {
1667  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1668  exit(1);
1669  }
1670 
1671  /* close the connection to the database and cleanup */
1672  PQfinish(conn);
1673  return 0;
1674 }
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
static void test_nosync(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4231
static void test_prepared(PGconn *conn)
#define fprintf
Definition: port.h:220
static void test_multi_pipelines(PGconn *conn)
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
static void test_pipeline_abort(PGconn *conn)
static void test_disallowed_in_pipeline(PGconn *conn)
int optind
Definition: getopt.c:50
PGconn * conn
Definition: streamutil.c:54
static void test_transaction(PGconn *conn)
char * c
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
char * tracefile
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:414
static void print_test_list(void)
#define PG_IOLBF
Definition: port.h:344
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
static void test_simple_pipeline(PGconn *conn)
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:412
static void test_uniqviol(PGconn *conn)
char * optarg
Definition: getopt.c:52
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
static void test_pipelined_insert(PGconn *conn, int n_rows)
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6691
static void exit_nicely(PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:713
static void usage(const char *progname)

◆ pg_attribute_noreturn()

static void pg_attribute_noreturn ( )
static

Definition at line 72 of file libpq_pipeline.c.

References generate_unaccent_rules::args, Assert, fprintf, progname, generate_unaccent_rules::stdout, and vfprintf.

74 {
75  va_list args;
76 
77 
78  fflush(stdout);
79 
80  fprintf(stderr, "\n%s:%d: ", progname, line);
81  va_start(args, fmt);
82  vfprintf(stderr, fmt, args);
83  va_end(args);
84  Assert(fmt[strlen(fmt) - 1] != '\n');
85  fprintf(stderr, "\n");
86  exit(1);
87 }
const char *const progname
#define fprintf
Definition: port.h:220
#define Assert(condition)
Definition: c.h:804
#define vfprintf
Definition: port.h:219

◆ print_test_list()

static void print_test_list ( void  )
static

Definition at line 1546 of file libpq_pipeline.c.

References printf.

Referenced by main().

1547 {
1548  printf("disallowed_in_pipeline\n");
1549  printf("multi_pipelines\n");
1550  printf("nosync\n");
1551  printf("pipeline_abort\n");
1552  printf("pipelined_insert\n");
1553  printf("prepared\n");
1554  printf("simple_pipeline\n");
1555  printf("singlerow\n");
1556  printf("transaction\n");
1557  printf("uniqviol\n");
1558 }
#define printf(...)
Definition: port.h:222

◆ process_result()

static bool process_result ( PGconn conn,
PGresult res,
int  results,
int  numsent 
)
static

Definition at line 1486 of file libpq_pipeline.c.

References fprintf, pg_fatal, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetResult(), PQgetvalue(), PQresStatus(), and PQresultStatus().

Referenced by test_uniqviol().

1487 {
1488  PGresult *res2;
1489  bool got_error = false;
1490 
1491  if (res == NULL)
1492  pg_fatal("got unexpected NULL");
1493 
1494  switch (PQresultStatus(res))
1495  {
1496  case PGRES_FATAL_ERROR:
1497  got_error = true;
1498  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1499  PQclear(res);
1500 
1501  res2 = PQgetResult(conn);
1502  if (res2 != NULL)
1503  pg_fatal("expected NULL, got %s",
1504  PQresStatus(PQresultStatus(res2)));
1505  break;
1506 
1507  case PGRES_TUPLES_OK:
1508  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1509  PQclear(res);
1510 
1511  res2 = PQgetResult(conn);
1512  if (res2 != NULL)
1513  pg_fatal("expected NULL, got %s",
1514  PQresStatus(PQresultStatus(res2)));
1515  break;
1516 
1518  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1519  res2 = PQgetResult(conn);
1520  if (res2 != NULL)
1521  pg_fatal("expected NULL, got %s",
1522  PQresStatus(PQresultStatus(res2)));
1523  break;
1524 
1525  default:
1526  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1527  }
1528 
1529  return got_error;
1530 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ test_disallowed_in_pipeline()

static void test_disallowed_in_pipeline ( PGconn conn)
static

Definition at line 90 of file libpq_pipeline.c.

References fprintf, pg_fatal, PGRES_FATAL_ERROR, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQisBusy(), PQisnonblocking(), PQpipelineStatus(), and PQresultStatus().

Referenced by main().

91 {
92  PGresult *res = NULL;
93 
94  fprintf(stderr, "test error cases... ");
95 
96  if (PQisnonblocking(conn))
97  pg_fatal("Expected blocking connection mode");
98 
99  if (PQenterPipelineMode(conn) != 1)
100  pg_fatal("Unable to enter pipeline mode");
101 
102  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
103  pg_fatal("Pipeline mode not activated properly");
104 
105  /* PQexec should fail in pipeline mode */
106  res = PQexec(conn, "SELECT 1");
107  if (PQresultStatus(res) != PGRES_FATAL_ERROR)
108  pg_fatal("PQexec should fail in pipeline mode but succeeded");
109 
110  /* Entering pipeline mode when already in pipeline mode is OK */
111  if (PQenterPipelineMode(conn) != 1)
112  pg_fatal("re-entering pipeline mode should be a no-op but failed");
113 
114  if (PQisBusy(conn) != 0)
115  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
116 
117  /* ok, back to normal command mode */
118  if (PQexitPipelineMode(conn) != 1)
119  pg_fatal("couldn't exit idle empty pipeline mode");
120 
121  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
122  pg_fatal("Pipeline mode not terminated properly");
123 
124  /* exiting pipeline mode when not in pipeline mode should be a no-op */
125  if (PQexitPipelineMode(conn) != 1)
126  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
127 
128  /* can now PQexec again */
129  res = PQexec(conn, "SELECT 1");
130  if (PQresultStatus(res) != PGRES_TUPLES_OK)
131  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
132  PQerrorMessage(conn));
133 
134  fprintf(stderr, "ok\n");
135 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3747
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1951
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6786
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193

◆ test_multi_pipelines()

static void test_multi_pipelines ( PGconn conn)
static

Definition at line 138 of file libpq_pipeline.c.

References fprintf, pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), and PQsendQueryParams().

Referenced by main().

139 {
140  PGresult *res = NULL;
141  const char *dummy_params[1] = {"1"};
142  Oid dummy_param_oids[1] = {INT4OID};
143 
144  fprintf(stderr, "multi pipeline... ");
145 
146  /*
147  * Queue up a couple of small pipelines and process each without returning
148  * to command mode first.
149  */
150  if (PQenterPipelineMode(conn) != 1)
151  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
152 
153  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
154  dummy_params, NULL, NULL, 0) != 1)
155  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
156 
157  if (PQpipelineSync(conn) != 1)
158  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
159 
160  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
161  dummy_params, NULL, NULL, 0) != 1)
162  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
163 
164  if (PQpipelineSync(conn) != 1)
165  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
166 
167  /* OK, start processing the results */
168  res = PQgetResult(conn);
169  if (res == NULL)
170  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
171  PQerrorMessage(conn));
172 
173  if (PQresultStatus(res) != PGRES_TUPLES_OK)
174  pg_fatal("Unexpected result code %s from first pipeline item",
176  PQclear(res);
177  res = NULL;
178 
179  if (PQgetResult(conn) != NULL)
180  pg_fatal("PQgetResult returned something extra after first result");
181 
182  if (PQexitPipelineMode(conn) != 0)
183  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
184 
185  res = PQgetResult(conn);
186  if (res == NULL)
187  pg_fatal("PQgetResult returned null when sync result expected: %s",
188  PQerrorMessage(conn));
189 
191  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
193  PQclear(res);
194 
195  /* second pipeline */
196 
197  res = PQgetResult(conn);
198  if (res == NULL)
199  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
200  PQerrorMessage(conn));
201 
202  if (PQresultStatus(res) != PGRES_TUPLES_OK)
203  pg_fatal("Unexpected result code %s from second pipeline item",
205 
206  res = PQgetResult(conn);
207  if (res != NULL)
208  pg_fatal("Expected null result, got %s",
210 
211  res = PQgetResult(conn);
212  if (res == NULL)
213  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
214  PQerrorMessage(conn));
215 
217  pg_fatal("Unexpected result code %s from second pipeline sync",
219 
220  /* We're still in pipeline mode ... */
221  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
222  pg_fatal("Fell out of pipeline mode somehow");
223 
224  /* until we end it, which we can safely do now */
225  if (PQexitPipelineMode(conn) != 1)
226  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
227  PQerrorMessage(conn));
228 
229  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
230  pg_fatal("exiting pipeline mode didn't seem to work");
231 
232  fprintf(stderr, "ok\n");
233 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
unsigned int Oid
Definition: postgres_ext.h:31
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6786
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ test_nosync()

static void test_nosync ( PGconn conn)
static

Definition at line 240 of file libpq_pipeline.c.

References exit_nicely(), fprintf, i, pg_fatal, PGRES_TUPLES_OK, PQclear(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQflush(), PQgetResult(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsocket(), select, and strerror.

Referenced by main().

241 {
242  int numqueries = 10;
243  int results = 0;
244  int sock = PQsocket(conn);
245 
246  fprintf(stderr, "nosync... ");
247 
248  if (sock < 0)
249  pg_fatal("invalid socket");
250 
251  if (PQenterPipelineMode(conn) != 1)
252  pg_fatal("could not enter pipeline mode");
253  for (int i = 0; i < numqueries; i++)
254  {
255  fd_set input_mask;
256  struct timeval tv;
257 
258  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
259  0, NULL, NULL, NULL, NULL, 0) != 1)
260  pg_fatal("error sending select: %s", PQerrorMessage(conn));
261  PQflush(conn);
262 
263  /*
264  * If the server has written anything to us, read (some of) it now.
265  */
266  FD_ZERO(&input_mask);
267  FD_SET(sock, &input_mask);
268  tv.tv_sec = 0;
269  tv.tv_usec = 0;
270  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
271  {
272  fprintf(stderr, "select() failed: %s\n", strerror(errno));
273  exit_nicely(conn);
274  }
275  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
276  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
277  }
278 
279  /* tell server to flush its output buffer */
280  if (PQsendFlushRequest(conn) != 1)
281  pg_fatal("failed to send flush request");
282  PQflush(conn);
283 
284  /* Now read all results */
285  for (;;)
286  {
287  PGresult *res;
288 
289  res = PQgetResult(conn);
290 
291  /* NULL results are only expected after TUPLES_OK */
292  if (res == NULL)
293  pg_fatal("got unexpected NULL result after %d results", results);
294 
295  /* We expect exactly one TUPLES_OK result for each query we sent */
296  if (PQresultStatus(res) == PGRES_TUPLES_OK)
297  {
298  PGresult *res2;
299 
300  /* and one NULL result should follow each */
301  res2 = PQgetResult(conn);
302  if (res2 != NULL)
303  pg_fatal("expected NULL, got %s",
304  PQresStatus(PQresultStatus(res2)));
305  PQclear(res);
306  results++;
307 
308  /* if we're done, we're done */
309  if (results == numqueries)
310  break;
311 
312  continue;
313  }
314 
315  /* anything else is unexpected */
316  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
317  }
318 
319  fprintf(stderr, "ok\n");
320 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3144
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define strerror
Definition: port.h:229
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6770
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
static void exit_nicely(PGconn *conn)

◆ test_pipeline_abort()

static void test_pipeline_abort ( PGconn conn)
static

Definition at line 332 of file libpq_pipeline.c.

References create_table_sql, drop_table_sql, fprintf, i, insert_sql, PG_DIAG_SQLSTATE, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQ_PIPELINE_ABORTED, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultErrorField(), PQresultErrorMessage(), PQresultStatus(), PQsendQuery(), PQsendQueryParams(), PQsetSingleRowMode(), printf, and val.

Referenced by main().

333 {
334  PGresult *res = NULL;
335  const char *dummy_params[1] = {"1"};
336  Oid dummy_param_oids[1] = {INT4OID};
337  int i;
338  int gotrows;
339  bool goterror;
340 
341  fprintf(stderr, "aborted pipeline... ");
342 
343  res = PQexec(conn, drop_table_sql);
344  if (PQresultStatus(res) != PGRES_COMMAND_OK)
345  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
346 
347  res = PQexec(conn, create_table_sql);
348  if (PQresultStatus(res) != PGRES_COMMAND_OK)
349  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
350 
351  /*
352  * Queue up a couple of small pipelines and process each without returning
353  * to command mode first. Make sure the second operation in the first
354  * pipeline ERRORs.
355  */
356  if (PQenterPipelineMode(conn) != 1)
357  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
358 
359  dummy_params[0] = "1";
360  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
361  dummy_params, NULL, NULL, 0) != 1)
362  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
363 
364  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
365  1, dummy_param_oids, dummy_params,
366  NULL, NULL, 0) != 1)
367  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
368 
369  dummy_params[0] = "2";
370  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
371  dummy_params, NULL, NULL, 0) != 1)
372  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
373 
374  if (PQpipelineSync(conn) != 1)
375  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
376 
377  dummy_params[0] = "3";
378  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
379  dummy_params, NULL, NULL, 0) != 1)
380  pg_fatal("dispatching second-pipeline insert failed: %s",
381  PQerrorMessage(conn));
382 
383  if (PQpipelineSync(conn) != 1)
384  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
385 
386  /*
387  * OK, start processing the pipeline results.
388  *
389  * We should get a command-ok for the first query, then a fatal error and
390  * a pipeline aborted message for the second insert, a pipeline-end, then
391  * a command-ok and a pipeline-ok for the second pipeline operation.
392  */
393  res = PQgetResult(conn);
394  if (res == NULL)
395  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
396  if (PQresultStatus(res) != PGRES_COMMAND_OK)
397  pg_fatal("Unexpected result status %s: %s",
399  PQresultErrorMessage(res));
400  PQclear(res);
401 
402  /* NULL result to signal end-of-results for this command */
403  if ((res = PQgetResult(conn)) != NULL)
404  pg_fatal("Expected null result, got %s",
406 
407  /* Second query caused error, so we expect an error next */
408  res = PQgetResult(conn);
409  if (res == NULL)
410  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
411  if (PQresultStatus(res) != PGRES_FATAL_ERROR)
412  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
414  PQclear(res);
415 
416  /* NULL result to signal end-of-results for this command */
417  if ((res = PQgetResult(conn)) != NULL)
418  pg_fatal("Expected null result, got %s",
420 
421  /*
422  * pipeline should now be aborted.
423  *
424  * Note that we could still queue more queries at this point if we wanted;
425  * they'd get added to a new third pipeline since we've already sent a
426  * second. The aborted flag relates only to the pipeline being received.
427  */
429  pg_fatal("pipeline should be flagged as aborted but isn't");
430 
431  /* third query in pipeline, the second insert */
432  res = PQgetResult(conn);
433  if (res == NULL)
434  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
436  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
438  PQclear(res);
439 
440  /* NULL result to signal end-of-results for this command */
441  if ((res = PQgetResult(conn)) != NULL)
442  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
443 
445  pg_fatal("pipeline should be flagged as aborted but isn't");
446 
447  /* Ensure we're still in pipeline */
448  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
449  pg_fatal("Fell out of pipeline mode somehow");
450 
451  /*
452  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
453  *
454  * (This is so clients know to start processing results normally again and
455  * can tell the difference between skipped commands and the sync.)
456  */
457  res = PQgetResult(conn);
458  if (res == NULL)
459  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
461  pg_fatal("Unexpected result code from first pipeline sync\n"
462  "Expected PGRES_PIPELINE_SYNC, got %s",
464  PQclear(res);
465 
467  pg_fatal("sync should've cleared the aborted flag but didn't");
468 
469  /* We're still in pipeline mode... */
470  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
471  pg_fatal("Fell out of pipeline mode somehow");
472 
473  /* the insert from the second pipeline */
474  res = PQgetResult(conn);
475  if (res == NULL)
476  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
477  if (PQresultStatus(res) != PGRES_COMMAND_OK)
478  pg_fatal("Unexpected result code %s from first item in second pipeline",
480  PQclear(res);
481 
482  /* Read the NULL result at the end of the command */
483  if ((res = PQgetResult(conn)) != NULL)
484  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
485 
486  /* the second pipeline sync */
487  if ((res = PQgetResult(conn)) == NULL)
488  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
490  pg_fatal("Unexpected result code %s from second pipeline sync",
492  PQclear(res);
493 
494  if ((res = PQgetResult(conn)) != NULL)
495  pg_fatal("Expected null result, got %s: %s",
497  PQerrorMessage(conn));
498 
499  /* Try to send two queries in one command */
500  if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
501  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
502  if (PQpipelineSync(conn) != 1)
503  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
504  goterror = false;
505  while ((res = PQgetResult(conn)) != NULL)
506  {
507  switch (PQresultStatus(res))
508  {
509  case PGRES_FATAL_ERROR:
510  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
511  pg_fatal("expected error about multiple commands, got %s",
512  PQerrorMessage(conn));
513  printf("got expected %s", PQerrorMessage(conn));
514  goterror = true;
515  break;
516  default:
517  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
518  break;
519  }
520  }
521  if (!goterror)
522  pg_fatal("did not get cannot-insert-multiple-commands error");
523  res = PQgetResult(conn);
524  if (res == NULL)
525  pg_fatal("got NULL result");
527  pg_fatal("Unexpected result code %s from pipeline sync",
529  fprintf(stderr, "ok\n");
530 
531  /* Test single-row mode with an error partways */
532  if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
533  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
534  if (PQpipelineSync(conn) != 1)
535  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
536  PQsetSingleRowMode(conn);
537  goterror = false;
538  gotrows = 0;
539  while ((res = PQgetResult(conn)) != NULL)
540  {
541  switch (PQresultStatus(res))
542  {
543  case PGRES_SINGLE_TUPLE:
544  printf("got row: %s\n", PQgetvalue(res, 0, 0));
545  gotrows++;
546  break;
547  case PGRES_FATAL_ERROR:
548  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
549  pg_fatal("expected division-by-zero, got: %s (%s)",
550  PQerrorMessage(conn),
552  printf("got expected division-by-zero\n");
553  goterror = true;
554  break;
555  default:
556  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
557  }
558  PQclear(res);
559  }
560  if (!goterror)
561  pg_fatal("did not get division-by-zero error");
562  if (gotrows != 3)
563  pg_fatal("did not get three rows");
564  /* the third pipeline sync */
565  if ((res = PQgetResult(conn)) == NULL)
566  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
568  pg_fatal("Unexpected result code %s from third pipeline sync",
570  PQclear(res);
571 
572  /* We're still in pipeline mode... */
573  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
574  pg_fatal("Fell out of pipeline mode somehow");
575 
576  /* until we end it, which we can safely do now */
577  if (PQexitPipelineMode(conn) != 1)
578  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
579  PQerrorMessage(conn));
580 
581  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
582  pg_fatal("exiting pipeline mode didn't seem to work");
583 
584  fprintf(stderr, "ok\n");
585 
586  /*-
587  * Since we fired the pipelines off without a surrounding xact, the results
588  * should be:
589  *
590  * - Implicit xact started by server around 1st pipeline
591  * - First insert applied
592  * - Second statement aborted xact
593  * - Third insert skipped
594  * - Sync rolled back first implicit xact
595  * - Implicit xact created by server around 2nd pipeline
596  * - insert applied from 2nd pipeline
597  * - Sync commits 2nd xact
598  *
599  * So we should only have the value 3 that we inserted.
600  */
601  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
602 
603  if (PQresultStatus(res) != PGRES_TUPLES_OK)
604  pg_fatal("Expected tuples, got %s: %s",
606  if (PQntuples(res) != 1)
607  pg_fatal("expected 1 result, got %d", PQntuples(res));
608  for (i = 0; i < PQntuples(res); i++)
609  {
610  const char *val = PQgetvalue(res, i, 0);
611 
612  if (strcmp(val, "3") != 0)
613  pg_fatal("expected only insert with value 3, got %s", val);
614  }
615 
616  PQclear(res);
617 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define printf(...)
Definition: port.h:222
unsigned int Oid
Definition: postgres_ext.h:31
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1326
static const char *const create_table_sql
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1876
static const char *const insert_sql
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3233
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6786
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3194
static const char *const drop_table_sql
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
long val
Definition: informix.c:664
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ test_pipelined_insert()

static void test_pipelined_insert ( PGconn conn,
int  n_rows 
)
static

Definition at line 633 of file libpq_pipeline.c.

References Assert, BI_BEGIN_TX, BI_COMMIT_TX, BI_CREATE_TABLE, BI_DONE, BI_DROP_TABLE, BI_INSERT_ROWS, BI_PREPARE, BI_SYNC, create_table_sql, drop_table_sql, exit_nicely(), fprintf, insert_sql2, MAXINT8LEN, MAXINTLEN, pg_debug, pg_fatal, PGRES_COMMAND_OK, PGRES_PIPELINE_SYNC, PQclear(), PQcmdStatus(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQflush(), PQgetResult(), PQisBusy(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), select, snprintf, status(), generate_unaccent_rules::stdout, and strerror.

Referenced by main().

634 {
635  Oid insert_param_oids[2] = {INT4OID, INT8OID};
636  const char *insert_params[2];
637  char insert_param_0[MAXINTLEN];
638  char insert_param_1[MAXINT8LEN];
639  enum PipelineInsertStep send_step = BI_BEGIN_TX,
640  recv_step = BI_BEGIN_TX;
641  int rows_to_send,
642  rows_to_receive;
643 
644  insert_params[0] = insert_param_0;
645  insert_params[1] = insert_param_1;
646 
647  rows_to_send = rows_to_receive = n_rows;
648 
649  /*
650  * Do a pipelined insert into a table created at the start of the pipeline
651  */
652  if (PQenterPipelineMode(conn) != 1)
653  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
654 
655  while (send_step != BI_PREPARE)
656  {
657  const char *sql;
658 
659  switch (send_step)
660  {
661  case BI_BEGIN_TX:
662  sql = "BEGIN TRANSACTION";
663  send_step = BI_DROP_TABLE;
664  break;
665 
666  case BI_DROP_TABLE:
667  sql = drop_table_sql;
668  send_step = BI_CREATE_TABLE;
669  break;
670 
671  case BI_CREATE_TABLE:
672  sql = create_table_sql;
673  send_step = BI_PREPARE;
674  break;
675 
676  default:
677  pg_fatal("invalid state");
678  sql = NULL; /* keep compiler quiet */
679  }
680 
681  pg_debug("sending: %s\n", sql);
682  if (PQsendQueryParams(conn, sql,
683  0, NULL, NULL, NULL, NULL, 0) != 1)
684  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
685  }
686 
687  Assert(send_step == BI_PREPARE);
688  pg_debug("sending: %s\n", insert_sql2);
689  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
690  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
691  send_step = BI_INSERT_ROWS;
692 
693  /*
694  * Now we start inserting. We'll be sending enough data that we could fill
695  * our output buffer, so to avoid deadlocking we need to enter nonblocking
696  * mode and consume input while we send more output. As results of each
697  * query are processed we should pop them to allow processing of the next
698  * query. There's no need to finish the pipeline before processing
699  * results.
700  */
701  if (PQsetnonblocking(conn, 1) != 0)
702  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
703 
704  while (recv_step != BI_DONE)
705  {
706  int sock;
707  fd_set input_mask;
708  fd_set output_mask;
709 
710  sock = PQsocket(conn);
711 
712  if (sock < 0)
713  break; /* shouldn't happen */
714 
715  FD_ZERO(&input_mask);
716  FD_SET(sock, &input_mask);
717  FD_ZERO(&output_mask);
718  FD_SET(sock, &output_mask);
719 
720  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
721  {
722  fprintf(stderr, "select() failed: %s\n", strerror(errno));
723  exit_nicely(conn);
724  }
725 
726  /*
727  * Process any results, so we keep the server's output buffer free
728  * flowing and it can continue to process input
729  */
730  if (FD_ISSET(sock, &input_mask))
731  {
732  PQconsumeInput(conn);
733 
734  /* Read until we'd block if we tried to read */
735  while (!PQisBusy(conn) && recv_step < BI_DONE)
736  {
737  PGresult *res;
738  const char *cmdtag = "";
739  const char *description = "";
740  int status;
741 
742  /*
743  * Read next result. If no more results from this query,
744  * advance to the next query
745  */
746  res = PQgetResult(conn);
747  if (res == NULL)
748  continue;
749 
750  status = PGRES_COMMAND_OK;
751  switch (recv_step)
752  {
753  case BI_BEGIN_TX:
754  cmdtag = "BEGIN";
755  recv_step++;
756  break;
757  case BI_DROP_TABLE:
758  cmdtag = "DROP TABLE";
759  recv_step++;
760  break;
761  case BI_CREATE_TABLE:
762  cmdtag = "CREATE TABLE";
763  recv_step++;
764  break;
765  case BI_PREPARE:
766  cmdtag = "";
767  description = "PREPARE";
768  recv_step++;
769  break;
770  case BI_INSERT_ROWS:
771  cmdtag = "INSERT";
772  rows_to_receive--;
773  if (rows_to_receive == 0)
774  recv_step++;
775  break;
776  case BI_COMMIT_TX:
777  cmdtag = "COMMIT";
778  recv_step++;
779  break;
780  case BI_SYNC:
781  cmdtag = "";
782  description = "SYNC";
783  status = PGRES_PIPELINE_SYNC;
784  recv_step++;
785  break;
786  case BI_DONE:
787  /* unreachable */
788  pg_fatal("unreachable state");
789  }
790 
791  if (PQresultStatus(res) != status)
792  pg_fatal("%s reported status %s, expected %s\n"
793  "Error message: \"%s\"",
794  description, PQresStatus(PQresultStatus(res)),
795  PQresStatus(status), PQerrorMessage(conn));
796 
797  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
798  pg_fatal("%s expected command tag '%s', got '%s'",
799  description, cmdtag, PQcmdStatus(res));
800 
801  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
802 
803  PQclear(res);
804  }
805  }
806 
807  /* Write more rows and/or the end pipeline message, if needed */
808  if (FD_ISSET(sock, &output_mask))
809  {
810  PQflush(conn);
811 
812  if (send_step == BI_INSERT_ROWS)
813  {
814  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
815  /* use up some buffer space with a wide value */
816  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
817 
818  if (PQsendQueryPrepared(conn, "my_insert",
819  2, insert_params, NULL, NULL, 0) == 1)
820  {
821  pg_debug("sent row %d\n", rows_to_send);
822 
823  rows_to_send--;
824  if (rows_to_send == 0)
825  send_step++;
826  }
827  else
828  {
829  /*
830  * in nonblocking mode, so it's OK for an insert to fail
831  * to send
832  */
833  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
834  rows_to_send, PQerrorMessage(conn));
835  }
836  }
837  else if (send_step == BI_COMMIT_TX)
838  {
839  if (PQsendQueryParams(conn, "COMMIT",
840  0, NULL, NULL, NULL, NULL, 0) == 1)
841  {
842  pg_debug("sent COMMIT\n");
843  send_step++;
844  }
845  else
846  {
847  fprintf(stderr, "WARNING: failed to send commit: %s\n",
848  PQerrorMessage(conn));
849  }
850  }
851  else if (send_step == BI_SYNC)
852  {
853  if (PQpipelineSync(conn) == 1)
854  {
855  fprintf(stdout, "pipeline sync sent\n");
856  send_step++;
857  }
858  else
859  {
860  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
861  PQerrorMessage(conn));
862  }
863  }
864  }
865  }
866 
867  /* We've got the sync message and the pipeline should be done */
868  if (PQexitPipelineMode(conn) != 1)
869  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
870  PQerrorMessage(conn));
871 
872  if (PQsetnonblocking(conn, 0) != 0)
873  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
874 
875  fprintf(stderr, "ok\n");
876 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
static const char *const insert_sql2
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
PipelineInsertStep
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define pg_debug(...)
unsigned int Oid
Definition: postgres_ext.h:31
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
static const char *const create_table_sql
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
#define MAXINTLEN
#define MAXINT8LEN
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3519
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3710
void PQclear(PGresult *res)
Definition: fe-exec.c:694
#define Assert(condition)
Definition: c.h:804
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1951
#define strerror
Definition: port.h:229
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1584
static const char *const drop_table_sql
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
#define snprintf
Definition: port.h:216
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6770
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
static void exit_nicely(PGconn *conn)
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1484

◆ test_prepared()

static void test_prepared ( PGconn conn)
static

Definition at line 879 of file libpq_pipeline.c.

References fprintf, i, lengthof, pg_fatal, PGRES_COMMAND_OK, PGRES_PIPELINE_SYNC, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQftype(), PQgetResult(), PQnfields(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendDescribePortal(), PQsendDescribePrepared(), and PQsendPrepare().

Referenced by main().

880 {
881  PGresult *res = NULL;
882  Oid param_oids[1] = {INT4OID};
883  Oid expected_oids[4];
884  Oid typ;
885 
886  fprintf(stderr, "prepared... ");
887 
888  if (PQenterPipelineMode(conn) != 1)
889  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
890  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
891  "interval '1 sec'",
892  1, param_oids) != 1)
893  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
894  expected_oids[0] = INT4OID;
895  expected_oids[1] = TEXTOID;
896  expected_oids[2] = NUMERICOID;
897  expected_oids[3] = INTERVALOID;
898  if (PQsendDescribePrepared(conn, "select_one") != 1)
899  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
900  if (PQpipelineSync(conn) != 1)
901  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
902 
903  res = PQgetResult(conn);
904  if (res == NULL)
905  pg_fatal("PQgetResult returned null");
906  if (PQresultStatus(res) != PGRES_COMMAND_OK)
907  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
908  PQclear(res);
909  res = PQgetResult(conn);
910  if (res != NULL)
911  pg_fatal("expected NULL result");
912 
913  res = PQgetResult(conn);
914  if (res == NULL)
915  pg_fatal("PQgetResult returned NULL");
916  if (PQresultStatus(res) != PGRES_COMMAND_OK)
917  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
918  if (PQnfields(res) != lengthof(expected_oids))
919  pg_fatal("expected %d columns, got %d",
920  lengthof(expected_oids), PQnfields(res));
921  for (int i = 0; i < PQnfields(res); i++)
922  {
923  typ = PQftype(res, i);
924  if (typ != expected_oids[i])
925  pg_fatal("field %d: expected type %u, got %u",
926  i, expected_oids[i], typ);
927  }
928  PQclear(res);
929  res = PQgetResult(conn);
930  if (res != NULL)
931  pg_fatal("expected NULL result");
932 
933  res = PQgetResult(conn);
935  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
936 
937  if (PQexitPipelineMode(conn) != 1)
938  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
939 
940  PQexec(conn, "BEGIN");
941  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
942  PQenterPipelineMode(conn);
943  if (PQsendDescribePortal(conn, "cursor_one") != 1)
944  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
945  if (PQpipelineSync(conn) != 1)
946  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
947  res = PQgetResult(conn);
948  if (res == NULL)
949  pg_fatal("expected NULL result");
950  if (PQresultStatus(res) != PGRES_COMMAND_OK)
951  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
952 
953  typ = PQftype(res, 0);
954  if (typ != INT4OID)
955  pg_fatal("portal: expected type %u, got %u",
956  INT4OID, typ);
957  PQclear(res);
958  res = PQgetResult(conn);
959  if (res != NULL)
960  pg_fatal("expected NULL result");
961  res = PQgetResult(conn);
963  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
964 
965  if (PQexitPipelineMode(conn) != 1)
966  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
967 
968  fprintf(stderr, "ok\n");
969 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3256
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2435
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define lengthof(array)
Definition: c.h:734
unsigned int Oid
Definition: postgres_ext.h:31
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3486
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2422
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1484

◆ test_simple_pipeline()

static void test_simple_pipeline ( PGconn conn)
static

Definition at line 972 of file libpq_pipeline.c.

References fprintf, pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQisnonblocking(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), and PQsendQueryParams().

Referenced by main().

973 {
974  PGresult *res = NULL;
975  const char *dummy_params[1] = {"1"};
976  Oid dummy_param_oids[1] = {INT4OID};
977 
978  fprintf(stderr, "simple pipeline... ");
979 
980  /*
981  * Enter pipeline mode and dispatch a set of operations, which we'll then
982  * process the results of as they come in.
983  *
984  * For a simple case we should be able to do this without interim
985  * processing of results since our output buffer will give us enough slush
986  * to work with and we won't block on sending. So blocking mode is fine.
987  */
988  if (PQisnonblocking(conn))
989  pg_fatal("Expected blocking connection mode");
990 
991  if (PQenterPipelineMode(conn) != 1)
992  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
993 
994  if (PQsendQueryParams(conn, "SELECT $1",
995  1, dummy_param_oids, dummy_params,
996  NULL, NULL, 0) != 1)
997  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
998 
999  if (PQexitPipelineMode(conn) != 0)
1000  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1001 
1002  if (PQpipelineSync(conn) != 1)
1003  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1004 
1005  res = PQgetResult(conn);
1006  if (res == NULL)
1007  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1008  PQerrorMessage(conn));
1009 
1010  if (PQresultStatus(res) != PGRES_TUPLES_OK)
1011  pg_fatal("Unexpected result code %s from first pipeline item",
1012  PQresStatus(PQresultStatus(res)));
1013 
1014  PQclear(res);
1015  res = NULL;
1016 
1017  if (PQgetResult(conn) != NULL)
1018  pg_fatal("PQgetResult returned something extra after first query result.");
1019 
1020  /*
1021  * Even though we've processed the result there's still a sync to come and
1022  * we can't exit pipeline mode yet
1023  */
1024  if (PQexitPipelineMode(conn) != 0)
1025  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1026 
1027  res = PQgetResult(conn);
1028  if (res == NULL)
1029  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1030  PQerrorMessage(conn));
1031 
1032  if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1033  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1035 
1036  PQclear(res);
1037  res = NULL;
1038 
1039  if (PQgetResult(conn) != NULL)
1040  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1041  PQresStatus(PQresultStatus(res)));
1042 
1043  /* We're still in pipeline mode... */
1044  if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1045  pg_fatal("Fell out of pipeline mode somehow");
1046 
1047  /* ... until we end it, which we can safely do now */
1048  if (PQexitPipelineMode(conn) != 1)
1049  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1050  PQerrorMessage(conn));
1051 
1052  if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1053  pg_fatal("Exiting pipeline mode didn't seem to work");
1054 
1055  fprintf(stderr, "ok\n");
1056 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3747
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
unsigned int Oid
Definition: postgres_ext.h:31
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:6786
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ test_singlerowmode()

static void test_singlerowmode ( PGconn conn)
static

Definition at line 1059 of file libpq_pipeline.c.

References fprintf, i, pfree(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendQueryParams(), PQsetSingleRowMode(), and psprintf().

Referenced by main().

1060 {
1061  PGresult *res;
1062  int i;
1063  bool pipeline_ended = false;
1064 
1065  /* 1 pipeline, 3 queries in it */
1066  if (PQenterPipelineMode(conn) != 1)
1067  pg_fatal("failed to enter pipeline mode: %s",
1068  PQerrorMessage(conn));
1069 
1070  for (i = 0; i < 3; i++)
1071  {
1072  char *param[1];
1073 
1074  param[0] = psprintf("%d", 44 + i);
1075 
1076  if (PQsendQueryParams(conn,
1077  "SELECT generate_series(42, $1)",
1078  1,
1079  NULL,
1080  (const char **) param,
1081  NULL,
1082  NULL,
1083  0) != 1)
1084  pg_fatal("failed to send query: %s",
1085  PQerrorMessage(conn));
1086  pfree(param[0]);
1087  }
1088  if (PQpipelineSync(conn) != 1)
1089  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1090 
1091  for (i = 0; !pipeline_ended; i++)
1092  {
1093  bool first = true;
1094  bool saw_ending_tuplesok;
1095  bool isSingleTuple = false;
1096 
1097  /* Set single row mode for only first 2 SELECT queries */
1098  if (i < 2)
1099  {
1100  if (PQsetSingleRowMode(conn) != 1)
1101  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1102  }
1103 
1104  /* Consume rows for this query */
1105  saw_ending_tuplesok = false;
1106  while ((res = PQgetResult(conn)) != NULL)
1107  {
1108  ExecStatusType est = PQresultStatus(res);
1109 
1110  if (est == PGRES_PIPELINE_SYNC)
1111  {
1112  fprintf(stderr, "end of pipeline reached\n");
1113  pipeline_ended = true;
1114  PQclear(res);
1115  if (i != 3)
1116  pg_fatal("Expected three results, got %d", i);
1117  break;
1118  }
1119 
1120  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1121  if (first)
1122  {
1123  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1124  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1125  i, PQresStatus(est));
1126  if (i >= 2 && est != PGRES_TUPLES_OK)
1127  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1128  i, PQresStatus(est));
1129  first = false;
1130  }
1131 
1132  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1133  switch (est)
1134  {
1135  case PGRES_TUPLES_OK:
1136  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1137  saw_ending_tuplesok = true;
1138  if (isSingleTuple)
1139  {
1140  if (PQntuples(res) == 0)
1141  fprintf(stderr, "all tuples received in query %d\n", i);
1142  else
1143  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1144  }
1145  break;
1146 
1147  case PGRES_SINGLE_TUPLE:
1148  isSingleTuple = true;
1149  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1150  break;
1151 
1152  default:
1153  pg_fatal("unexpected");
1154  }
1155  PQclear(res);
1156  }
1157  if (!pipeline_ended && !saw_ending_tuplesok)
1158  pg_fatal("didn't get expected terminating TUPLES_OK");
1159  }
1160 
1161  if (PQexitPipelineMode(conn) != 1)
1162  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1163 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
ExecStatusType
Definition: libpq-fe.h:92
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void pfree(void *pointer)
Definition: mcxt.c:1169
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1876
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int i
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ test_transaction()

static void test_transaction ( PGconn conn)
static

Definition at line 1170 of file libpq_pipeline.c.

References fprintf, i, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), and printf.

Referenced by main().

1171 {
1172  PGresult *res;
1173  bool expect_null;
1174  int num_syncs = 0;
1175 
1176  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1177  "CREATE TABLE pq_pipeline_tst (id int)");
1178  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1179  pg_fatal("failed to create test table: %s",
1180  PQerrorMessage(conn));
1181  PQclear(res);
1182 
1183  if (PQenterPipelineMode(conn) != 1)
1184  pg_fatal("failed to enter pipeline mode: %s",
1185  PQerrorMessage(conn));
1186  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1187  pg_fatal("could not send prepare on pipeline: %s",
1188  PQerrorMessage(conn));
1189 
1190  if (PQsendQueryParams(conn,
1191  "BEGIN",
1192  0, NULL, NULL, NULL, NULL, 0) != 1)
1193  pg_fatal("failed to send query: %s",
1194  PQerrorMessage(conn));
1195  if (PQsendQueryParams(conn,
1196  "SELECT 0/0",
1197  0, NULL, NULL, NULL, NULL, 0) != 1)
1198  pg_fatal("failed to send query: %s",
1199  PQerrorMessage(conn));
1200 
1201  /*
1202  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1203  * get out of the pipeline-aborted state first.
1204  */
1205  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1206  pg_fatal("failed to execute prepared: %s",
1207  PQerrorMessage(conn));
1208 
1209  /* This insert fails because we're in pipeline-aborted state */
1210  if (PQsendQueryParams(conn,
1211  "INSERT INTO pq_pipeline_tst VALUES (1)",
1212  0, NULL, NULL, NULL, NULL, 0) != 1)
1213  pg_fatal("failed to send query: %s",
1214  PQerrorMessage(conn));
1215  if (PQpipelineSync(conn) != 1)
1216  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1217  num_syncs++;
1218 
1219  /*
1220  * This insert fails even though the pipeline got a SYNC, because we're in
1221  * an aborted transaction
1222  */
1223  if (PQsendQueryParams(conn,
1224  "INSERT INTO pq_pipeline_tst VALUES (2)",
1225  0, NULL, NULL, NULL, NULL, 0) != 1)
1226  pg_fatal("failed to send query: %s",
1227  PQerrorMessage(conn));
1228  if (PQpipelineSync(conn) != 1)
1229  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1230  num_syncs++;
1231 
1232  /*
1233  * Send ROLLBACK using prepared stmt. This one works because we just did
1234  * PQpipelineSync above.
1235  */
1236  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1237  pg_fatal("failed to execute prepared: %s",
1238  PQerrorMessage(conn));
1239 
1240  /*
1241  * Now that we're out of a transaction and in pipeline-good mode, this
1242  * insert works
1243  */
1244  if (PQsendQueryParams(conn,
1245  "INSERT INTO pq_pipeline_tst VALUES (3)",
1246  0, NULL, NULL, NULL, NULL, 0) != 1)
1247  pg_fatal("failed to send query: %s",
1248  PQerrorMessage(conn));
1249  /* Send two syncs now -- match up to SYNC messages below */
1250  if (PQpipelineSync(conn) != 1)
1251  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1252  num_syncs++;
1253  if (PQpipelineSync(conn) != 1)
1254  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1255  num_syncs++;
1256 
1257  expect_null = false;
1258  for (int i = 0;; i++)
1259  {
1260  ExecStatusType restype;
1261 
1262  res = PQgetResult(conn);
1263  if (res == NULL)
1264  {
1265  printf("%d: got NULL result\n", i);
1266  if (!expect_null)
1267  pg_fatal("did not expect NULL here");
1268  expect_null = false;
1269  continue;
1270  }
1271  restype = PQresultStatus(res);
1272  printf("%d: got status %s", i, PQresStatus(restype));
1273  if (expect_null)
1274  pg_fatal("expected NULL");
1275  if (restype == PGRES_FATAL_ERROR)
1276  printf("; error: %s", PQerrorMessage(conn));
1277  else if (restype == PGRES_PIPELINE_ABORTED)
1278  {
1279  printf(": command didn't run because pipeline aborted\n");
1280  }
1281  else
1282  printf("\n");
1283  PQclear(res);
1284 
1285  if (restype == PGRES_PIPELINE_SYNC)
1286  num_syncs--;
1287  else
1288  expect_null = true;
1289  if (num_syncs <= 0)
1290  break;
1291  }
1292  if (PQgetResult(conn) != NULL)
1293  pg_fatal("returned something extra after all the syncs: %s",
1294  PQresStatus(PQresultStatus(res)));
1295 
1296  if (PQexitPipelineMode(conn) != 1)
1297  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1298 
1299  /* We expect to find one tuple containing the value "3" */
1300  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1301  if (PQresultStatus(res) != PGRES_TUPLES_OK)
1302  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1303  if (PQntuples(res) != 1)
1304  pg_fatal("did not get 1 tuple");
1305  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1306  pg_fatal("did not get expected tuple");
1307  PQclear(res);
1308 
1309  fprintf(stderr, "ok\n");
1310 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:2926
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1438
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3642
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3186
#define printf(...)
Definition: port.h:222
ExecStatusType
Definition: libpq-fe.h:92
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3248
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3077
void PQclear(PGresult *res)
Definition: fe-exec.c:694
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1584
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1484

◆ test_uniqviol()

static void test_uniqviol ( PGconn conn)
static

Definition at line 1318 of file libpq_pipeline.c.

References EINTR, fprintf, MAXINT8LEN, pg_fatal, PGRES_COMMAND_OK, PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQflush(), PQgetResult(), PQisBusy(), PQprepare(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), process_result(), select, and sprintf.

Referenced by main().

1319 {
1320  int sock = PQsocket(conn);
1321  PGresult *res;
1322  Oid paramTypes[2] = {INT8OID, INT8OID};
1323  const char *paramValues[2];
1324  char paramValue0[MAXINT8LEN];
1325  char paramValue1[MAXINT8LEN];
1326  int ctr = 0;
1327  int numsent = 0;
1328  int results = 0;
1329  bool read_done = false;
1330  bool write_done = false;
1331  bool error_sent = false;
1332  bool got_error = false;
1333  int switched = 0;
1334  int socketful = 0;
1335  fd_set in_fds;
1336  fd_set out_fds;
1337 
1338  fprintf(stderr, "uniqviol ...");
1339 
1340  PQsetnonblocking(conn, 1);
1341 
1342  paramValues[0] = paramValue0;
1343  paramValues[1] = paramValue1;
1344  sprintf(paramValue1, "42");
1345 
1346  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1347  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1348  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1349  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1350 
1351  res = PQexec(conn, "begin");
1352  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1353  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1354 
1355  res = PQprepare(conn, "insertion",
1356  "insert into ppln_uniqviol values ($1, $2) returning id",
1357  2, paramTypes);
1358  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1359  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1360 
1361  if (PQenterPipelineMode(conn) != 1)
1362  pg_fatal("failed to enter pipeline mode");
1363 
1364  while (!read_done)
1365  {
1366  /*
1367  * Avoid deadlocks by reading everything the server has sent before
1368  * sending anything. (Special precaution is needed here to process
1369  * PQisBusy before testing the socket for read-readiness, because the
1370  * socket does not turn read-ready after "sending" queries in aborted
1371  * pipeline mode.)
1372  */
1373  while (PQisBusy(conn) == 0)
1374  {
1375  bool new_error;
1376 
1377  if (results >= numsent)
1378  {
1379  if (write_done)
1380  read_done = true;
1381  break;
1382  }
1383 
1384  res = PQgetResult(conn);
1385  new_error = process_result(conn, res, results, numsent);
1386  if (new_error && got_error)
1387  pg_fatal("got two errors");
1388  got_error |= new_error;
1389  if (results++ >= numsent - 1)
1390  {
1391  if (write_done)
1392  read_done = true;
1393  break;
1394  }
1395  }
1396 
1397  if (read_done)
1398  break;
1399 
1400  FD_ZERO(&out_fds);
1401  FD_SET(sock, &out_fds);
1402 
1403  FD_ZERO(&in_fds);
1404  FD_SET(sock, &in_fds);
1405 
1406  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1407  {
1408  if (errno == EINTR)
1409  continue;
1410  pg_fatal("select() failed: %m");
1411  }
1412 
1413  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1414  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1415 
1416  /*
1417  * If the socket is writable and we haven't finished sending queries,
1418  * send some.
1419  */
1420  if (!write_done && FD_ISSET(sock, &out_fds))
1421  {
1422  for (;;)
1423  {
1424  int flush;
1425 
1426  /*
1427  * provoke uniqueness violation exactly once after having
1428  * switched to read mode.
1429  */
1430  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1431  {
1432  sprintf(paramValue0, "%d", numsent / 2);
1433  fprintf(stderr, "E");
1434  error_sent = true;
1435  }
1436  else
1437  {
1438  fprintf(stderr, ".");
1439  sprintf(paramValue0, "%d", ctr++);
1440  }
1441 
1442  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1443  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1444  numsent++;
1445 
1446  /* Are we done writing? */
1447  if (socketful != 0 && numsent % socketful == 42 && error_sent)
1448  {
1449  if (PQsendFlushRequest(conn) != 1)
1450  pg_fatal("failed to send flush request");
1451  write_done = true;
1452  fprintf(stderr, "\ndone writing\n");
1453  PQflush(conn);
1454  break;
1455  }
1456 
1457  /* is the outgoing socket full? */
1458  flush = PQflush(conn);
1459  if (flush == -1)
1460  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1461  if (flush == 1)
1462  {
1463  if (socketful == 0)
1464  socketful = numsent;
1465  fprintf(stderr, "\nswitch to reading\n");
1466  switched++;
1467  break;
1468  }
1469  }
1470  }
1471  }
1472 
1473  if (!got_error)
1474  pg_fatal("did not get expected error");
1475 
1476  fprintf(stderr, "ok\n");
1477 }
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2237
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_fatal(...)
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:2894
unsigned int Oid
Definition: postgres_ext.h:31
#define fprintf
Definition: port.h:220
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
#define sprintf
Definition: port.h:218
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3144
int PQflush(PGconn *conn)
Definition: fe-exec.c:3766
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
#define MAXINT8LEN
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1904
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3710
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1951
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1584
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
#define EINTR
Definition: win32_port.h:343
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6770
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978

◆ usage()

static void usage ( const char *  progname)
static

Definition at line 1534 of file libpq_pipeline.c.

References fprintf.

Referenced by main().

1535 {
1536  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1537  fprintf(stderr, "Usage:\n");
1538  fprintf(stderr, " %s [OPTION] tests\n", progname);
1539  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1540  fprintf(stderr, "\nOptions:\n");
1541  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1542  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1543 }
const char *const progname
#define fprintf
Definition: port.h:220

Variable Documentation

◆ create_table_sql

const char* const create_table_sql
static
Initial value:
=
"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
"int8filler int8);"

Definition at line 48 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ drop_table_sql

const char* const drop_table_sql
static
Initial value:
=
"DROP TABLE IF EXISTS pq_pipeline_demo"

Definition at line 46 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ insert_sql

const char* const insert_sql
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)"

Definition at line 51 of file libpq_pipeline.c.

Referenced by test_pipeline_abort().

◆ insert_sql2

const char* const insert_sql2
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)"

Definition at line 53 of file libpq_pipeline.c.

Referenced by test_pipelined_insert().

◆ progname

const char* const progname = "libpq_pipeline"

Definition at line 34 of file libpq_pipeline.c.

Referenced by pg_attribute_noreturn().

◆ tracefile

char* tracefile = NULL

Definition at line 37 of file libpq_pipeline.c.

Referenced by main().