PostgreSQL Source Code  git master
libpq_pipeline.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/time.h>
#include "catalog/pg_type_d.h"
#include "common/fe_memutils.h"
#include "libpq-fe.h"
#include "pg_getopt.h"
#include "portability/instr_time.h"
Include dependency graph for libpq_pipeline.c:

Go to the source code of this file.

Macros

#define pg_debug(...)
 
#define MAXINTLEN   12
 
#define MAXINT8LEN   20
 
#define pg_fatal(...)   pg_fatal_impl(__LINE__, __VA_ARGS__)
 
#define confirm_query_canceled(conn)   confirm_query_canceled_impl(__LINE__, conn)
 
#define send_cancellable_query(conn, monitorConn)    send_cancellable_query_impl(__LINE__, conn, monitorConn)
 

Enumerations

enum  PipelineInsertStep {
  BI_BEGIN_TX , BI_DROP_TABLE , BI_CREATE_TABLE , BI_PREPARE ,
  BI_INSERT_ROWS , BI_COMMIT_TX , BI_SYNC , BI_DONE
}
 

Functions

static void exit_nicely (PGconn *conn)
 
static void pg_attribute_noreturn () pg_fatal_impl(int line
 
static void const char pg_attribute_printf (2, 3)
 
static bool process_result (PGconn *conn, PGresult *res, int results, int numsent)
 
static void const char fflush (stdout)
 
 fprintf (stderr, "\n%s:%d: ", progname, line)
 
 va_start (args, fmt)
 
 vfprintf (stderr, fmt, args)
 
 va_end (args)
 
 Assert (fmt[strlen(fmt) - 1] !='\n')
 
 fprintf (stderr, "\n")
 
 exit (1)
 
static void confirm_query_canceled_impl (int line, PGconn *conn)
 
static void wait_for_connection_state (int line, PGconn *monitorConn, int procpid, char *state, char *event)
 
static void send_cancellable_query_impl (int line, PGconn *conn, PGconn *monitorConn)
 
static PGconncopy_connection (PGconn *conn)
 
static void test_cancel (PGconn *conn)
 
static void test_disallowed_in_pipeline (PGconn *conn)
 
static void test_multi_pipelines (PGconn *conn)
 
static void test_nosync (PGconn *conn)
 
static void test_pipeline_abort (PGconn *conn)
 
static void test_pipelined_insert (PGconn *conn, int n_rows)
 
static void test_prepared (PGconn *conn)
 
static void notice_processor (void *arg, const char *message)
 
static void test_pipeline_idle (PGconn *conn)
 
static void test_simple_pipeline (PGconn *conn)
 
static void test_singlerowmode (PGconn *conn)
 
static void test_transaction (PGconn *conn)
 
static void test_uniqviol (PGconn *conn)
 
static void usage (const char *progname)
 
static void print_test_list (void)
 
int main (int argc, char **argv)
 

Variables

static void const char * fmt
 
const char *const progname = "libpq_pipeline"
 
char * tracefile = NULL
 
static const char *const drop_table_sql
 
static const char *const create_table_sql
 
static const char *const insert_sql
 
static const char *const insert_sql2
 

Macro Definition Documentation

◆ confirm_query_canceled

#define confirm_query_canceled (   conn)    confirm_query_canceled_impl(__LINE__, conn)

Definition at line 96 of file libpq_pipeline.c.

◆ MAXINT8LEN

#define MAXINT8LEN   20

Definition at line 58 of file libpq_pipeline.c.

◆ MAXINTLEN

#define MAXINTLEN   12

Definition at line 57 of file libpq_pipeline.c.

◆ pg_debug

#define pg_debug (   ...)

Definition at line 43 of file libpq_pipeline.c.

◆ pg_fatal

#define pg_fatal (   ...)    pg_fatal_impl(__LINE__, __VA_ARGS__)

Definition at line 75 of file libpq_pipeline.c.

◆ send_cancellable_query

#define send_cancellable_query (   conn,
  monitorConn 
)     send_cancellable_query_impl(__LINE__, conn, monitorConn)

Definition at line 172 of file libpq_pipeline.c.

Enumeration Type Documentation

◆ PipelineInsertStep

Enumerator
BI_BEGIN_TX 
BI_DROP_TABLE 
BI_CREATE_TABLE 
BI_PREPARE 
BI_INSERT_ROWS 
BI_COMMIT_TX 
BI_SYNC 
BI_DONE 

Definition at line 995 of file libpq_pipeline.c.

996 {
997  BI_BEGIN_TX,
1000  BI_PREPARE,
1002  BI_COMMIT_TX,
1003  BI_SYNC,
1004  BI_DONE,
1005 };
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX

Function Documentation

◆ Assert()

Assert ( fmt [strlen(fmt) - 1] = '\n')

◆ confirm_query_canceled_impl()

static void confirm_query_canceled_impl ( int  line,
PGconn conn 
)
static

Definition at line 98 of file libpq_pipeline.c.

99 {
100  PGresult *res = NULL;
101 
102  res = PQgetResult(conn);
103  if (res == NULL)
104  pg_fatal_impl(line, "PQgetResult returned null: %s",
107  pg_fatal_impl(line, "query did not fail when it was expected");
108  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
109  pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
111  PQclear(res);
112 
113  while (PQisBusy(conn))
115 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:111
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
PGconn * conn
Definition: streamutil.c:55

References conn, PG_DIAG_SQLSTATE, PGRES_FATAL_ERROR, PQclear(), PQconsumeInput(), PQerrorMessage(), PQgetResult(), PQisBusy(), PQresultErrorField(), PQresultStatus(), and res.

◆ copy_connection()

static PGconn* copy_connection ( PGconn conn)
static

Definition at line 206 of file libpq_pipeline.c.

207 {
208  PGconn *copyConn;
210  const char **keywords;
211  const char **vals;
212  int nopts = 1;
213  int i = 0;
214 
215  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
216  nopts++;
217 
218  keywords = pg_malloc(sizeof(char *) * nopts);
219  vals = pg_malloc(sizeof(char *) * nopts);
220 
221  for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
222  {
223  if (opt->val)
224  {
225  keywords[i] = opt->keyword;
226  vals[i] = opt->val;
227  i++;
228  }
229  }
230  keywords[i] = vals[i] = NULL;
231 
232  copyConn = PQconnectdbParams(keywords, vals, false);
233 
234  if (PQstatus(copyConn) != CONNECTION_OK)
235  pg_fatal("Connection to database failed: %s",
236  PQerrorMessage(copyConn));
237 
238  return copyConn;
239 }
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:689
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:6936
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7094
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:61
#define pg_fatal(...)
static AmcheckOptions opts
Definition: pg_amcheck.c:111

References conn, CONNECTION_OK, i, opts, pg_fatal, pg_malloc(), PQconnectdbParams(), PQconninfo(), PQerrorMessage(), and PQstatus().

Referenced by test_cancel().

◆ exit()

exit ( )

Referenced by _check_database_version(), add_socket_to_set(), add_typedefs_from_file(), addtype(), adjleap(), adjust_data_dir(), append_database_pattern(), append_relation_pattern_helper(), append_schema_pattern(), appendPsqlMetaConnect(), appendQualifiedRelation(), appendShellString(), associate(), bail_out(), BaseBackup(), bbstreamer_zstd_compressor_new(), bootstrap_template1(), change_directory(), check_input(), check_locale_name(), check_ok(), check_prepare_conn(), check_publisher(), check_root(), check_subscriber(), check_testspec(), CheckConnection(), CheckDataVersion(), checkInitSteps(), close_cur1(), close_file(), cluster_one_database(), compile_database_list(), compile_relation_list_one_db(), connect_database(), connect_slot(), connectToServer(), create_data_directory(), create_xlog_or_symlink(), CreateBackupStreamer(), die_on_query_failure(), digestControlFile(), disconnect_database(), do_init(), do_kill(), do_logrotate(), do_promote(), do_reload(), do_restart(), do_start(), do_status(), do_stop(), dolink(), ecpg_filter_source(), ecpg_filter_stderr(), ecpg_start_test(), enlargeStringInfo(), ensureCleanShutdown(), err(), error(), errstart(), errx(), executeCommand(), executeQuery(), executeQueryOrDie(), executeStatement(), exit_nicely(), find_other_exec_or_die(), findBuiltin(), FindStreamingStart(), get_control_dbstate(), get_id(), get_opts(), get_pgpid(), get_record1(), get_restricted_token(), get_su_pwd(), get_user_name_or_exit(), get_var1(), GetConnection(), getfields(), GetTableInfo(), GucInfoMain(), handle_args(), handle_help_version_opts(), HandleStartupProcInterrupts(), infile(), Initialize(), isolation_init(), isolation_start_test(), leapadd(), libpqsrv_cancel(), main(), memory_exhausted(), mkdirs(), newabbr(), open_cur1(), open_walfile(), ParallelBackupStart(), parse_psql_options(), parseCommandLine(), parseServiceFile(), pg_ctl_status(), pg_fatal(), pg_log_v(), pg_malloc_internal(), pg_realloc(), pg_strdup(), pg_wcsformat(), pgfdw_get_cleanup_result(), pgwin32_is_admin(), pnstrdup(), postprocess_sql_command(), PQprint(), printTable(), printTableAddCell(), printTableAddHeader(), printTableInit(), proc_exit(), process_backslash_command(), process_directory_recursively(), psql_start_test(), pvsnprintf(), read_controlfile(), read_dumpall_filters(), read_post_opts(), regression_main(), reindex_one_database(), replace_percent_placeholders(), report_backup_error(), report_clusters_compatible(), report_fatal_error(), report_manifest_error(), rewind_parseTimeLineHistory(), rpytime(), rulesub(), run_command(), run_permutation(), run_reindex_command(), runInitSteps(), s_lock_stuck(), save_ps_display_args(), scan_for_existing_tablespaces(), search_directory(), set_mode(), set_option(), set_sig(), setup_data_file_paths(), setup_locale_encoding(), setup_pgdata(), setup_publisher(), SetWALFileNameForCleanup(), spawn_process(), sql_check(), sql_conn(), sql_exec(), start_postmaster(), StartLogStreamer(), startup_hacks(), StreamLog(), syntax_error(), test_strlower(), test_timing(), threadRun(), time_overflow(), TransferPredicateLocksToNewTarget(), try_complete_step(), usage(), vacuum_delay_point(), vacuum_one_database(), WalRcvWaitForStartPosition(), walsummary_error_callback(), and writezone().

◆ exit_nicely()

static void exit_nicely ( PGconn conn)
static

Definition at line 61 of file libpq_pipeline.c.

62 {
63  PQfinish(conn);
64  exit(1);
65 }
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4868
exit(1)

References conn, exit(), and PQfinish().

Referenced by main(), test_nosync(), and test_pipelined_insert().

◆ fflush()

static void const char fflush ( stdout  )

Referenced by adjust_data_dir(), bootstrap_template1(), check_ok(), cluster_all_databases(), create_data_directory(), create_xlog_or_symlink(), do_copy(), do_init(), do_pg_backup_stop(), do_shell(), do_watch(), dumpnfa(), echo_hidden_command(), ecpg_log(), editFile(), emit_tap_output_v(), ensureCleanShutdown(), errfinish(), ExceptionalCondition(), exec_command_print(), exec_command_prompt(), exec_command_write(), exec_prog(), ExecQueryAndProcessResults(), ExecQueryTuples(), ExecuteRecoveryCommand(), fork_process(), get_bin_version(), get_control_data(), get_prompt(), get_su_pwd(), gets_interactive(), handleCopyIn(), handleCopyOut(), HandleSlashCmds(), initialize_data_directory(), InteractiveBackend(), log_pre_callback(), main(), MainLoop(), OpenPipeStream(), openQueryOutputFile(), PageOutput(), parallel_exec_prog(), parallel_transfer_all_new_dbs(), ParallelBackupStart(), parse_required_wal(), perform_spin_delay(), pg_log_generic_v(), pg_log_v(), pg_regcomp(), pg_regexec(), pipe_read_line(), plpgsql_dumptree(), popen_check(), pprint(), PQdisplayTuples(), pqFlush(), PQprint(), PQuntrace(), print(), print_filemap(), print_msg(), PrintNotifications(), PrintQueryStatus(), PrintQueryTuples(), printVersion(), psql_end_command(), PSQLexec(), regression_main(), reindex_all_databases(), RestoreArchivedFile(), run_diff(), runPgDump(), runShellCommand(), SendQuery(), setup_config(), shell_archive_file(), simple_prompt_extended(), spawn_process(), start_postmaster(), stop_postmaster(), SysLogger_Start(), test_config_settings(), test_file_descriptor_sync(), test_non_sync(), test_open_sync(), test_specific_config_settings(), test_sync(), vacuum_one_database(), vacuumlo(), and write_stderr().

◆ fprintf() [1/2]

fprintf ( stderr  ,
"\n"   
)

◆ fprintf() [2/2]

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2166 of file libpq_pipeline.c.

2167 {
2168  const char *conninfo = "";
2169  PGconn *conn;
2170  FILE *trace;
2171  char *testname;
2172  int numrows = 10000;
2173  PGresult *res;
2174  int c;
2175 
2176  while ((c = getopt(argc, argv, "r:t:")) != -1)
2177  {
2178  switch (c)
2179  {
2180  case 'r': /* numrows */
2181  errno = 0;
2182  numrows = strtol(optarg, NULL, 10);
2183  if (errno != 0 || numrows <= 0)
2184  {
2185  fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2186  optarg);
2187  exit(1);
2188  }
2189  break;
2190  case 't': /* trace file */
2192  break;
2193  }
2194  }
2195 
2196  if (optind < argc)
2197  {
2198  testname = pg_strdup(argv[optind]);
2199  optind++;
2200  }
2201  else
2202  {
2203  usage(argv[0]);
2204  exit(1);
2205  }
2206 
2207  if (strcmp(testname, "tests") == 0)
2208  {
2209  print_test_list();
2210  exit(0);
2211  }
2212 
2213  if (optind < argc)
2214  {
2215  conninfo = pg_strdup(argv[optind]);
2216  optind++;
2217  }
2218 
2219  /* Make a connection to the database */
2220  conn = PQconnectdb(conninfo);
2221  if (PQstatus(conn) != CONNECTION_OK)
2222  {
2223  fprintf(stderr, "Connection to database failed: %s\n",
2224  PQerrorMessage(conn));
2225  exit_nicely(conn);
2226  }
2227 
2228  res = PQexec(conn, "SET lc_messages TO \"C\"");
2230  pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
2231  res = PQexec(conn, "SET debug_parallel_query = off");
2233  pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
2234 
2235  /* Set the trace file, if requested */
2236  if (tracefile != NULL)
2237  {
2238  if (strcmp(tracefile, "-") == 0)
2239  trace = stdout;
2240  else
2241  trace = fopen(tracefile, "w");
2242  if (trace == NULL)
2243  pg_fatal("could not open file \"%s\": %m", tracefile);
2244 
2245  /* Make it line-buffered */
2246  setvbuf(trace, NULL, PG_IOLBF, 0);
2247 
2248  PQtrace(conn, trace);
2251  }
2252 
2253  if (strcmp(testname, "cancel") == 0)
2254  test_cancel(conn);
2255  else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2257  else if (strcmp(testname, "multi_pipelines") == 0)
2259  else if (strcmp(testname, "nosync") == 0)
2260  test_nosync(conn);
2261  else if (strcmp(testname, "pipeline_abort") == 0)
2263  else if (strcmp(testname, "pipeline_idle") == 0)
2265  else if (strcmp(testname, "pipelined_insert") == 0)
2266  test_pipelined_insert(conn, numrows);
2267  else if (strcmp(testname, "prepared") == 0)
2269  else if (strcmp(testname, "simple_pipeline") == 0)
2271  else if (strcmp(testname, "singlerow") == 0)
2273  else if (strcmp(testname, "transaction") == 0)
2275  else if (strcmp(testname, "uniqviol") == 0)
2277  else
2278  {
2279  fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2280  exit(1);
2281  }
2282 
2283  /* close the connection to the database and cleanup */
2284  PQfinish(conn);
2285  return 0;
2286 }
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:744
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:442
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:444
static void usage(const char *progname)
static void print_test_list(void)
static void exit_nicely(PGconn *conn)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
char * tracefile
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static void test_nosync(PGconn *conn)
static void test_pipeline_abort(PGconn *conn)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
PGDLLIMPORT int optind
Definition: getopt.c:50
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:71
PGDLLIMPORT char * optarg
Definition: getopt.c:52
#define PG_IOLBF
Definition: port.h:361
char * c

References conn, CONNECTION_OK, exit(), exit_nicely(), fprintf(), getopt(), optarg, optind, pg_fatal, PG_IOLBF, pg_strdup(), PGRES_COMMAND_OK, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultStatus(), PQsetTraceFlags(), PQstatus(), PQtrace(), PQTRACE_REGRESS_MODE, PQTRACE_SUPPRESS_TIMESTAMPS, print_test_list(), res, generate_unaccent_rules::stdout, test_cancel(), test_disallowed_in_pipeline(), test_multi_pipelines(), test_nosync(), test_pipeline_abort(), test_pipeline_idle(), test_pipelined_insert(), test_prepared(), test_simple_pipeline(), test_singlerowmode(), test_transaction(), test_uniqviol(), tracefile, and usage().

◆ notice_processor()

static void notice_processor ( void *  arg,
const char *  message 
)
static

Definition at line 1413 of file libpq_pipeline.c.

1414 {
1415  int *n_notices = (int *) arg;
1416 
1417  (*n_notices)++;
1418  fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1419 }
void * arg

References arg, and fprintf().

Referenced by test_pipeline_idle().

◆ pg_attribute_noreturn()

static void pg_attribute_noreturn ( )
static

◆ pg_attribute_printf()

static void const char pg_attribute_printf ( ,
 
)

◆ print_test_list()

static void print_test_list ( void  )
static

Definition at line 2149 of file libpq_pipeline.c.

2150 {
2151  printf("cancel\n");
2152  printf("disallowed_in_pipeline\n");
2153  printf("multi_pipelines\n");
2154  printf("nosync\n");
2155  printf("pipeline_abort\n");
2156  printf("pipeline_idle\n");
2157  printf("pipelined_insert\n");
2158  printf("prepared\n");
2159  printf("simple_pipeline\n");
2160  printf("singlerow\n");
2161  printf("transaction\n");
2162  printf("uniqviol\n");
2163 }
#define printf(...)
Definition: port.h:244

References printf.

Referenced by main().

◆ process_result()

static bool process_result ( PGconn conn,
PGresult res,
int  results,
int  numsent 
)
static

Definition at line 2089 of file libpq_pipeline.c.

2090 {
2091  PGresult *res2;
2092  bool got_error = false;
2093 
2094  if (res == NULL)
2095  pg_fatal("got unexpected NULL");
2096 
2097  switch (PQresultStatus(res))
2098  {
2099  case PGRES_FATAL_ERROR:
2100  got_error = true;
2101  fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2102  PQclear(res);
2103 
2104  res2 = PQgetResult(conn);
2105  if (res2 != NULL)
2106  pg_fatal("expected NULL, got %s",
2107  PQresStatus(PQresultStatus(res2)));
2108  break;
2109 
2110  case PGRES_TUPLES_OK:
2111  fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2112  PQclear(res);
2113 
2114  res2 = PQgetResult(conn);
2115  if (res2 != NULL)
2116  pg_fatal("expected NULL, got %s",
2117  PQresStatus(PQresultStatus(res2)));
2118  break;
2119 
2121  fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2122  res2 = PQgetResult(conn);
2123  if (res2 != NULL)
2124  pg_fatal("expected NULL, got %s",
2125  PQresStatus(PQresultStatus(res2)));
2126  break;
2127 
2128  default:
2129  pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2130  }
2131 
2132  return got_error;
2133 }
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3419
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:115
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103

References conn, fprintf(), pg_fatal, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetResult(), PQgetvalue(), PQresStatus(), PQresultStatus(), and res.

Referenced by test_uniqviol().

◆ send_cancellable_query_impl()

static void send_cancellable_query_impl ( int  line,
PGconn conn,
PGconn monitorConn 
)
static

Definition at line 175 of file libpq_pipeline.c.

176 {
177  const char *env_wait;
178  const Oid paramTypes[1] = {INT4OID};
179 
180  /*
181  * Wait for the connection to be idle, so that our check for an active
182  * connection below is reliable, instead of possibly seeing an outdated
183  * state.
184  */
185  wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
186 
187  env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
188  if (env_wait == NULL)
189  env_wait = "180";
190 
191  if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
192  &env_wait, NULL, NULL, 0) != 1)
193  pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
194 
195  /*
196  * Wait for the sleep to be active, because if the query is not running
197  * yet, the cancel request that we send won't have any effect.
198  */
199  wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
200 }
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7181
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
unsigned int Oid
Definition: postgres_ext.h:31

References conn, PQbackendPID(), PQerrorMessage(), PQsendQueryParams(), and wait_for_connection_state().

◆ test_cancel()

static void test_cancel ( PGconn conn)
static

Definition at line 245 of file libpq_pipeline.c.

246 {
247  PGcancel *cancel;
249  PGconn *monitorConn;
250  char errorbuf[256];
251 
252  fprintf(stderr, "test cancellations... ");
253 
254  if (PQsetnonblocking(conn, 1) != 0)
255  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
256 
257  /*
258  * Make a separate connection to the database to monitor the query on the
259  * main connection.
260  */
261  monitorConn = copy_connection(conn);
262  Assert(PQstatus(monitorConn) == CONNECTION_OK);
263 
264  /* test PQcancel */
265  send_cancellable_query(conn, monitorConn);
266  cancel = PQgetCancel(conn);
267  if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
268  pg_fatal("failed to run PQcancel: %s", errorbuf);
270 
271  /* PGcancel object can be reused for the next query */
272  send_cancellable_query(conn, monitorConn);
273  if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
274  pg_fatal("failed to run PQcancel: %s", errorbuf);
276 
277  PQfreeCancel(cancel);
278 
279  /* test PQrequestCancel */
280  send_cancellable_query(conn, monitorConn);
281  if (!PQrequestCancel(conn))
282  pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
284 
285  /* test PQcancelBlocking */
286  send_cancellable_query(conn, monitorConn);
289  pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
292 
293  /* test PQcancelCreate and then polling with PQcancelPoll */
294  send_cancellable_query(conn, monitorConn);
297  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
298  while (true)
299  {
300  struct timeval tv;
301  fd_set input_mask;
302  fd_set output_mask;
304  int sock = PQcancelSocket(cancelConn);
305 
306  if (pollres == PGRES_POLLING_OK)
307  break;
308 
309  FD_ZERO(&input_mask);
310  FD_ZERO(&output_mask);
311  switch (pollres)
312  {
314  pg_debug("polling for reads\n");
315  FD_SET(sock, &input_mask);
316  break;
318  pg_debug("polling for writes\n");
319  FD_SET(sock, &output_mask);
320  break;
321  default:
322  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
323  }
324 
325  if (sock < 0)
326  pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
327 
328  tv.tv_sec = 3;
329  tv.tv_usec = 0;
330 
331  while (true)
332  {
333  if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
334  {
335  if (errno == EINTR)
336  continue;
337  pg_fatal("select() failed: %m");
338  }
339  break;
340  }
341  }
343  pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
345 
346  /*
347  * test PQcancelReset works on the cancel connection and it can be reused
348  * afterwards
349  */
351 
352  send_cancellable_query(conn, monitorConn);
354  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
355  while (true)
356  {
357  struct timeval tv;
358  fd_set input_mask;
359  fd_set output_mask;
361  int sock = PQcancelSocket(cancelConn);
362 
363  if (pollres == PGRES_POLLING_OK)
364  break;
365 
366  FD_ZERO(&input_mask);
367  FD_ZERO(&output_mask);
368  switch (pollres)
369  {
371  pg_debug("polling for reads\n");
372  FD_SET(sock, &input_mask);
373  break;
375  pg_debug("polling for writes\n");
376  FD_SET(sock, &output_mask);
377  break;
378  default:
379  pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
380  }
381 
382  if (sock < 0)
383  pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
384 
385  tv.tv_sec = 3;
386  tv.tv_usec = 0;
387 
388  while (true)
389  {
390  if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
391  {
392  if (errno == EINTR)
393  continue;
394  pg_fatal("select() failed: %m");
395  }
396  break;
397  }
398  }
400  pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
402 
404 
405  fprintf(stderr, "ok\n");
406 }
static PGcancel *volatile cancelConn
Definition: cancel.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:348
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:317
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:283
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:171
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:462
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:305
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:333
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:660
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:416
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3944
PostgresPollingStatusType
Definition: libpq-fe.h:89
@ PGRES_POLLING_OK
Definition: libpq-fe.h:93
@ PGRES_POLLING_READING
Definition: libpq-fe.h:91
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:92
#define confirm_query_canceled(conn)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_debug(...)
static PGconn * copy_connection(PGconn *conn)
#define send_cancellable_query(conn, monitorConn)
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495

References Assert(), cancelConn, confirm_query_canceled, conn, CONNECTION_OK, copy_connection(), EINTR, fprintf(), pg_debug, pg_fatal, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PQcancel(), PQcancelBlocking(), PQcancelCreate(), PQcancelErrorMessage(), PQcancelFinish(), PQcancelPoll(), PQcancelReset(), PQcancelSocket(), PQcancelStart(), PQcancelStatus(), PQerrorMessage(), PQfreeCancel(), PQgetCancel(), PQrequestCancel(), PQsetnonblocking(), PQstatus(), select, and send_cancellable_query.

Referenced by main().

◆ test_disallowed_in_pipeline()

static void test_disallowed_in_pipeline ( PGconn conn)
static

Definition at line 409 of file libpq_pipeline.c.

410 {
411  PGresult *res = NULL;
412 
413  fprintf(stderr, "test error cases... ");
414 
415  if (PQisnonblocking(conn))
416  pg_fatal("Expected blocking connection mode");
417 
418  if (PQenterPipelineMode(conn) != 1)
419  pg_fatal("Unable to enter pipeline mode");
420 
422  pg_fatal("Pipeline mode not activated properly");
423 
424  /* PQexec should fail in pipeline mode */
425  res = PQexec(conn, "SELECT 1");
427  pg_fatal("PQexec should fail in pipeline mode but succeeded");
428  if (strcmp(PQerrorMessage(conn),
429  "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
430  pg_fatal("did not get expected error message; got: \"%s\"",
432 
433  /* PQsendQuery should fail in pipeline mode */
434  if (PQsendQuery(conn, "SELECT 1") != 0)
435  pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
436  if (strcmp(PQerrorMessage(conn),
437  "PQsendQuery not allowed in pipeline mode\n") != 0)
438  pg_fatal("did not get expected error message; got: \"%s\"",
440 
441  /* Entering pipeline mode when already in pipeline mode is OK */
442  if (PQenterPipelineMode(conn) != 1)
443  pg_fatal("re-entering pipeline mode should be a no-op but failed");
444 
445  if (PQisBusy(conn) != 0)
446  pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
447 
448  /* ok, back to normal command mode */
449  if (PQexitPipelineMode(conn) != 1)
450  pg_fatal("couldn't exit idle empty pipeline mode");
451 
453  pg_fatal("Pipeline mode not terminated properly");
454 
455  /* exiting pipeline mode when not in pipeline mode should be a no-op */
456  if (PQexitPipelineMode(conn) != 1)
457  pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
458 
459  /* can now PQexec again */
460  res = PQexec(conn, "SELECT 1");
462  pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
464 
465  fprintf(stderr, "ok\n");
466 }
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7189
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3073
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3042
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3983
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:162

References conn, fprintf(), pg_fatal, PGRES_FATAL_ERROR, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQisBusy(), PQisnonblocking(), PQpipelineStatus(), PQresultStatus(), PQsendQuery(), and res.

Referenced by main().

◆ test_multi_pipelines()

static void test_multi_pipelines ( PGconn conn)
static

Definition at line 469 of file libpq_pipeline.c.

470 {
471  PGresult *res = NULL;
472  const char *dummy_params[1] = {"1"};
473  Oid dummy_param_oids[1] = {INT4OID};
474 
475  fprintf(stderr, "multi pipeline... ");
476 
477  /*
478  * Queue up a couple of small pipelines and process each without returning
479  * to command mode first.
480  */
481  if (PQenterPipelineMode(conn) != 1)
482  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
483 
484  /* first pipeline */
485  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
486  dummy_params, NULL, NULL, 0) != 1)
487  pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
488 
489  if (PQpipelineSync(conn) != 1)
490  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
491 
492  /* second pipeline */
493  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
494  dummy_params, NULL, NULL, 0) != 1)
495  pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
496 
497  /* Skip flushing once. */
498  if (PQsendPipelineSync(conn) != 1)
499  pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
500 
501  /* third pipeline */
502  if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
503  dummy_params, NULL, NULL, 0) != 1)
504  pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
505 
506  if (PQpipelineSync(conn) != 1)
507  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
508 
509  /* OK, start processing the results */
510 
511  /* first pipeline */
512 
513  res = PQgetResult(conn);
514  if (res == NULL)
515  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
517 
519  pg_fatal("Unexpected result code %s from first pipeline item",
521  PQclear(res);
522  res = NULL;
523 
524  if (PQgetResult(conn) != NULL)
525  pg_fatal("PQgetResult returned something extra after first result");
526 
527  if (PQexitPipelineMode(conn) != 0)
528  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
529 
530  res = PQgetResult(conn);
531  if (res == NULL)
532  pg_fatal("PQgetResult returned null when sync result expected: %s",
534 
536  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
538  PQclear(res);
539 
540  /* second pipeline */
541 
542  res = PQgetResult(conn);
543  if (res == NULL)
544  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
546 
548  pg_fatal("Unexpected result code %s from second pipeline item",
550  PQclear(res);
551  res = NULL;
552 
553  if (PQgetResult(conn) != NULL)
554  pg_fatal("PQgetResult returned something extra after first result");
555 
556  if (PQexitPipelineMode(conn) != 0)
557  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
558 
559  res = PQgetResult(conn);
560  if (res == NULL)
561  pg_fatal("PQgetResult returned null when sync result expected: %s",
563 
565  pg_fatal("Unexpected result code %s instead of sync result, error: %s",
567  PQclear(res);
568 
569  /* third pipeline */
570 
571  res = PQgetResult(conn);
572  if (res == NULL)
573  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
575 
577  pg_fatal("Unexpected result code %s from third pipeline item",
579 
580  res = PQgetResult(conn);
581  if (res != NULL)
582  pg_fatal("Expected null result, got %s",
584 
585  res = PQgetResult(conn);
586  if (res == NULL)
587  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
589 
591  pg_fatal("Unexpected result code %s from second pipeline sync",
593 
594  /* We're still in pipeline mode ... */
596  pg_fatal("Fell out of pipeline mode somehow");
597 
598  /* until we end it, which we can safely do now */
599  if (PQexitPipelineMode(conn) != 1)
600  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
602 
604  pg_fatal("exiting pipeline mode didn't seem to work");
605 
606  fprintf(stderr, "ok\n");
607 }
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3282
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3272
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:114

References conn, fprintf(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPipelineSync(), PQsendQueryParams(), and res.

Referenced by main().

◆ test_nosync()

static void test_nosync ( PGconn conn)
static

Definition at line 614 of file libpq_pipeline.c.

615 {
616  int numqueries = 10;
617  int results = 0;
618  int sock = PQsocket(conn);
619 
620  fprintf(stderr, "nosync... ");
621 
622  if (sock < 0)
623  pg_fatal("invalid socket");
624 
625  if (PQenterPipelineMode(conn) != 1)
626  pg_fatal("could not enter pipeline mode");
627  for (int i = 0; i < numqueries; i++)
628  {
629  fd_set input_mask;
630  struct timeval tv;
631 
632  if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
633  0, NULL, NULL, NULL, NULL, 0) != 1)
634  pg_fatal("error sending select: %s", PQerrorMessage(conn));
635  PQflush(conn);
636 
637  /*
638  * If the server has written anything to us, read (some of) it now.
639  */
640  FD_ZERO(&input_mask);
641  FD_SET(sock, &input_mask);
642  tv.tv_sec = 0;
643  tv.tv_usec = 0;
644  if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
645  {
646  fprintf(stderr, "select() failed: %m\n");
647  exit_nicely(conn);
648  }
649  if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
650  pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
651  }
652 
653  /* tell server to flush its output buffer */
654  if (PQsendFlushRequest(conn) != 1)
655  pg_fatal("failed to send flush request");
656  PQflush(conn);
657 
658  /* Now read all results */
659  for (;;)
660  {
661  PGresult *res;
662 
663  res = PQgetResult(conn);
664 
665  /* NULL results are only expected after TUPLES_OK */
666  if (res == NULL)
667  pg_fatal("got unexpected NULL result after %d results", results);
668 
669  /* We expect exactly one TUPLES_OK result for each query we sent */
671  {
672  PGresult *res2;
673 
674  /* and one NULL result should follow each */
675  res2 = PQgetResult(conn);
676  if (res2 != NULL)
677  pg_fatal("expected NULL, got %s",
678  PQresStatus(PQresultStatus(res2)));
679  PQclear(res);
680  results++;
681 
682  /* if we're done, we're done */
683  if (results == numqueries)
684  break;
685 
686  continue;
687  }
688 
689  /* anything else is unexpected */
690  pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
691  }
692 
693  fprintf(stderr, "ok\n");
694 }
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7173
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3371

References conn, exit_nicely(), fprintf(), i, pg_fatal, PGRES_TUPLES_OK, PQclear(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQflush(), PQgetResult(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsocket(), res, and select.

Referenced by main().

◆ test_pipeline_abort()

static void test_pipeline_abort ( PGconn conn)
static

Definition at line 706 of file libpq_pipeline.c.

707 {
708  PGresult *res = NULL;
709  const char *dummy_params[1] = {"1"};
710  Oid dummy_param_oids[1] = {INT4OID};
711  int i;
712  int gotrows;
713  bool goterror;
714 
715  fprintf(stderr, "aborted pipeline... ");
716 
719  pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
720 
723  pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
724 
725  /*
726  * Queue up a couple of small pipelines and process each without returning
727  * to command mode first. Make sure the second operation in the first
728  * pipeline ERRORs.
729  */
730  if (PQenterPipelineMode(conn) != 1)
731  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
732 
733  dummy_params[0] = "1";
734  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
735  dummy_params, NULL, NULL, 0) != 1)
736  pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
737 
738  if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
739  1, dummy_param_oids, dummy_params,
740  NULL, NULL, 0) != 1)
741  pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
742 
743  dummy_params[0] = "2";
744  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
745  dummy_params, NULL, NULL, 0) != 1)
746  pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
747 
748  if (PQpipelineSync(conn) != 1)
749  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
750 
751  dummy_params[0] = "3";
752  if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
753  dummy_params, NULL, NULL, 0) != 1)
754  pg_fatal("dispatching second-pipeline insert failed: %s",
756 
757  if (PQpipelineSync(conn) != 1)
758  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
759 
760  /*
761  * OK, start processing the pipeline results.
762  *
763  * We should get a command-ok for the first query, then a fatal error and
764  * a pipeline aborted message for the second insert, a pipeline-end, then
765  * a command-ok and a pipeline-ok for the second pipeline operation.
766  */
767  res = PQgetResult(conn);
768  if (res == NULL)
769  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
771  pg_fatal("Unexpected result status %s: %s",
774  PQclear(res);
775 
776  /* NULL result to signal end-of-results for this command */
777  if ((res = PQgetResult(conn)) != NULL)
778  pg_fatal("Expected null result, got %s",
780 
781  /* Second query caused error, so we expect an error next */
782  res = PQgetResult(conn);
783  if (res == NULL)
784  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
786  pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
788  PQclear(res);
789 
790  /* NULL result to signal end-of-results for this command */
791  if ((res = PQgetResult(conn)) != NULL)
792  pg_fatal("Expected null result, got %s",
794 
795  /*
796  * pipeline should now be aborted.
797  *
798  * Note that we could still queue more queries at this point if we wanted;
799  * they'd get added to a new third pipeline since we've already sent a
800  * second. The aborted flag relates only to the pipeline being received.
801  */
803  pg_fatal("pipeline should be flagged as aborted but isn't");
804 
805  /* third query in pipeline, the second insert */
806  res = PQgetResult(conn);
807  if (res == NULL)
808  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
810  pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
812  PQclear(res);
813 
814  /* NULL result to signal end-of-results for this command */
815  if ((res = PQgetResult(conn)) != NULL)
816  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
817 
819  pg_fatal("pipeline should be flagged as aborted but isn't");
820 
821  /* Ensure we're still in pipeline */
823  pg_fatal("Fell out of pipeline mode somehow");
824 
825  /*
826  * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
827  *
828  * (This is so clients know to start processing results normally again and
829  * can tell the difference between skipped commands and the sync.)
830  */
831  res = PQgetResult(conn);
832  if (res == NULL)
833  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
835  pg_fatal("Unexpected result code from first pipeline sync\n"
836  "Expected PGRES_PIPELINE_SYNC, got %s",
838  PQclear(res);
839 
841  pg_fatal("sync should've cleared the aborted flag but didn't");
842 
843  /* We're still in pipeline mode... */
845  pg_fatal("Fell out of pipeline mode somehow");
846 
847  /* the insert from the second pipeline */
848  res = PQgetResult(conn);
849  if (res == NULL)
850  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
852  pg_fatal("Unexpected result code %s from first item in second pipeline",
854  PQclear(res);
855 
856  /* Read the NULL result at the end of the command */
857  if ((res = PQgetResult(conn)) != NULL)
858  pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
859 
860  /* the second pipeline sync */
861  if ((res = PQgetResult(conn)) == NULL)
862  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
864  pg_fatal("Unexpected result code %s from second pipeline sync",
866  PQclear(res);
867 
868  if ((res = PQgetResult(conn)) != NULL)
869  pg_fatal("Expected null result, got %s: %s",
872 
873  /* Try to send two queries in one command */
874  if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
875  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
876  if (PQpipelineSync(conn) != 1)
877  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
878  goterror = false;
879  while ((res = PQgetResult(conn)) != NULL)
880  {
881  switch (PQresultStatus(res))
882  {
883  case PGRES_FATAL_ERROR:
884  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
885  pg_fatal("expected error about multiple commands, got %s",
887  printf("got expected %s", PQerrorMessage(conn));
888  goterror = true;
889  break;
890  default:
891  pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
892  break;
893  }
894  }
895  if (!goterror)
896  pg_fatal("did not get cannot-insert-multiple-commands error");
897  res = PQgetResult(conn);
898  if (res == NULL)
899  pg_fatal("got NULL result");
901  pg_fatal("Unexpected result code %s from pipeline sync",
903  fprintf(stderr, "ok\n");
904 
905  /* Test single-row mode with an error partways */
906  if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
907  0, NULL, NULL, NULL, NULL, 0) != 1)
908  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
909  if (PQpipelineSync(conn) != 1)
910  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
912  goterror = false;
913  gotrows = 0;
914  while ((res = PQgetResult(conn)) != NULL)
915  {
916  switch (PQresultStatus(res))
917  {
918  case PGRES_SINGLE_TUPLE:
919  printf("got row: %s\n", PQgetvalue(res, 0, 0));
920  gotrows++;
921  break;
922  case PGRES_FATAL_ERROR:
923  if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
924  pg_fatal("expected division-by-zero, got: %s (%s)",
927  printf("got expected division-by-zero\n");
928  goterror = true;
929  break;
930  default:
931  pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
932  }
933  PQclear(res);
934  }
935  if (!goterror)
936  pg_fatal("did not get division-by-zero error");
937  if (gotrows != 3)
938  pg_fatal("did not get three rows");
939  /* the third pipeline sync */
940  if ((res = PQgetResult(conn)) == NULL)
941  pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
943  pg_fatal("Unexpected result code %s from third pipeline sync",
945  PQclear(res);
946 
947  /* We're still in pipeline mode... */
949  pg_fatal("Fell out of pipeline mode somehow");
950 
951  /* until we end it, which we can safely do now */
952  if (PQexitPipelineMode(conn) != 1)
953  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
955 
957  pg_fatal("exiting pipeline mode didn't seem to work");
958 
959  /*-
960  * Since we fired the pipelines off without a surrounding xact, the results
961  * should be:
962  *
963  * - Implicit xact started by server around 1st pipeline
964  * - First insert applied
965  * - Second statement aborted xact
966  * - Third insert skipped
967  * - Sync rolled back first implicit xact
968  * - Implicit xact created by server around 2nd pipeline
969  * - insert applied from 2nd pipeline
970  * - Sync commits 2nd xact
971  *
972  * So we should only have the value 3 that we inserted.
973  */
974  res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
975 
977  pg_fatal("Expected tuples, got %s: %s",
979  if (PQntuples(res) != 1)
980  pg_fatal("expected 1 result, got %d", PQntuples(res));
981  for (i = 0; i < PQntuples(res); i++)
982  {
983  const char *val = PQgetvalue(res, i, 0);
984 
985  if (strcmp(val, "3") != 0)
986  pg_fatal("expected only insert with value 3, got %s", val);
987  }
988 
989  PQclear(res);
990 
991  fprintf(stderr, "ok\n");
992 }
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
long val
Definition: informix.c:670
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:113
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:164
static const char *const create_table_sql
static const char *const insert_sql
static const char *const drop_table_sql

References conn, create_table_sql, drop_table_sql, fprintf(), i, insert_sql, PG_DIAG_SQLSTATE, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQ_PIPELINE_ABORTED, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultErrorField(), PQresultErrorMessage(), PQresultStatus(), PQsendQueryParams(), PQsetSingleRowMode(), printf, res, and val.

Referenced by main().

◆ test_pipeline_idle()

static void test_pipeline_idle ( PGconn conn)
static

Definition at line 1423 of file libpq_pipeline.c.

1424 {
1425  PGresult *res;
1426  int n_notices = 0;
1427 
1428  fprintf(stderr, "\npipeline idle...\n");
1429 
1431 
1432  /* Try to exit pipeline mode in pipeline-idle state */
1433  if (PQenterPipelineMode(conn) != 1)
1434  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1435  if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1436  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1438  res = PQgetResult(conn);
1439  if (res == NULL)
1440  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441  PQerrorMessage(conn));
1443  pg_fatal("unexpected result code %s from first pipeline item",
1445  PQclear(res);
1446  res = PQgetResult(conn);
1447  if (res != NULL)
1448  pg_fatal("did not receive terminating NULL");
1449  if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1450  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1451  if (PQexitPipelineMode(conn) == 1)
1452  pg_fatal("exiting pipeline succeeded when it shouldn't");
1453  if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1454  strlen("cannot exit pipeline mode")) != 0)
1455  pg_fatal("did not get expected error; got: %s",
1456  PQerrorMessage(conn));
1458  res = PQgetResult(conn);
1460  pg_fatal("unexpected result code %s from second pipeline item",
1462  PQclear(res);
1463  res = PQgetResult(conn);
1464  if (res != NULL)
1465  pg_fatal("did not receive terminating NULL");
1466  if (PQexitPipelineMode(conn) != 1)
1467  pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1468 
1469  if (n_notices > 0)
1470  pg_fatal("got %d notice(s)", n_notices);
1471  fprintf(stderr, "ok - 1\n");
1472 
1473  /* Have a WARNING in the middle of a resultset */
1474  if (PQenterPipelineMode(conn) != 1)
1475  pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1476  if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1477  pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1479  res = PQgetResult(conn);
1480  if (res == NULL)
1481  pg_fatal("unexpected NULL result received");
1483  pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1484  if (PQexitPipelineMode(conn) != 1)
1485  pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1486  fprintf(stderr, "ok - 2\n");
1487 }
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7326
static void notice_processor(void *arg, const char *message)

References conn, fprintf(), notice_processor(), pg_fatal, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsetNoticeProcessor(), and res.

Referenced by main().

◆ test_pipelined_insert()

static void test_pipelined_insert ( PGconn conn,
int  n_rows 
)
static

Definition at line 1008 of file libpq_pipeline.c.

1009 {
1010  Oid insert_param_oids[2] = {INT4OID, INT8OID};
1011  const char *insert_params[2];
1012  char insert_param_0[MAXINTLEN];
1013  char insert_param_1[MAXINT8LEN];
1014  enum PipelineInsertStep send_step = BI_BEGIN_TX,
1015  recv_step = BI_BEGIN_TX;
1016  int rows_to_send,
1017  rows_to_receive;
1018 
1019  insert_params[0] = insert_param_0;
1020  insert_params[1] = insert_param_1;
1021 
1022  rows_to_send = rows_to_receive = n_rows;
1023 
1024  /*
1025  * Do a pipelined insert into a table created at the start of the pipeline
1026  */
1027  if (PQenterPipelineMode(conn) != 1)
1028  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1029 
1030  while (send_step != BI_PREPARE)
1031  {
1032  const char *sql;
1033 
1034  switch (send_step)
1035  {
1036  case BI_BEGIN_TX:
1037  sql = "BEGIN TRANSACTION";
1038  send_step = BI_DROP_TABLE;
1039  break;
1040 
1041  case BI_DROP_TABLE:
1042  sql = drop_table_sql;
1043  send_step = BI_CREATE_TABLE;
1044  break;
1045 
1046  case BI_CREATE_TABLE:
1047  sql = create_table_sql;
1048  send_step = BI_PREPARE;
1049  break;
1050 
1051  default:
1052  pg_fatal("invalid state");
1053  sql = NULL; /* keep compiler quiet */
1054  }
1055 
1056  pg_debug("sending: %s\n", sql);
1057  if (PQsendQueryParams(conn, sql,
1058  0, NULL, NULL, NULL, NULL, 0) != 1)
1059  pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1060  }
1061 
1062  Assert(send_step == BI_PREPARE);
1063  pg_debug("sending: %s\n", insert_sql2);
1064  if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1065  pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1066  send_step = BI_INSERT_ROWS;
1067 
1068  /*
1069  * Now we start inserting. We'll be sending enough data that we could fill
1070  * our output buffer, so to avoid deadlocking we need to enter nonblocking
1071  * mode and consume input while we send more output. As results of each
1072  * query are processed we should pop them to allow processing of the next
1073  * query. There's no need to finish the pipeline before processing
1074  * results.
1075  */
1076  if (PQsetnonblocking(conn, 1) != 0)
1077  pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1078 
1079  while (recv_step != BI_DONE)
1080  {
1081  int sock;
1082  fd_set input_mask;
1083  fd_set output_mask;
1084 
1085  sock = PQsocket(conn);
1086 
1087  if (sock < 0)
1088  break; /* shouldn't happen */
1089 
1090  FD_ZERO(&input_mask);
1091  FD_SET(sock, &input_mask);
1092  FD_ZERO(&output_mask);
1093  FD_SET(sock, &output_mask);
1094 
1095  if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1096  {
1097  fprintf(stderr, "select() failed: %m\n");
1098  exit_nicely(conn);
1099  }
1100 
1101  /*
1102  * Process any results, so we keep the server's output buffer free
1103  * flowing and it can continue to process input
1104  */
1105  if (FD_ISSET(sock, &input_mask))
1106  {
1108 
1109  /* Read until we'd block if we tried to read */
1110  while (!PQisBusy(conn) && recv_step < BI_DONE)
1111  {
1112  PGresult *res;
1113  const char *cmdtag = "";
1114  const char *description = "";
1115  int status;
1116 
1117  /*
1118  * Read next result. If no more results from this query,
1119  * advance to the next query
1120  */
1121  res = PQgetResult(conn);
1122  if (res == NULL)
1123  continue;
1124 
1125  status = PGRES_COMMAND_OK;
1126  switch (recv_step)
1127  {
1128  case BI_BEGIN_TX:
1129  cmdtag = "BEGIN";
1130  recv_step++;
1131  break;
1132  case BI_DROP_TABLE:
1133  cmdtag = "DROP TABLE";
1134  recv_step++;
1135  break;
1136  case BI_CREATE_TABLE:
1137  cmdtag = "CREATE TABLE";
1138  recv_step++;
1139  break;
1140  case BI_PREPARE:
1141  cmdtag = "";
1142  description = "PREPARE";
1143  recv_step++;
1144  break;
1145  case BI_INSERT_ROWS:
1146  cmdtag = "INSERT";
1147  rows_to_receive--;
1148  if (rows_to_receive == 0)
1149  recv_step++;
1150  break;
1151  case BI_COMMIT_TX:
1152  cmdtag = "COMMIT";
1153  recv_step++;
1154  break;
1155  case BI_SYNC:
1156  cmdtag = "";
1157  description = "SYNC";
1158  status = PGRES_PIPELINE_SYNC;
1159  recv_step++;
1160  break;
1161  case BI_DONE:
1162  /* unreachable */
1163  pg_fatal("unreachable state");
1164  }
1165 
1166  if (PQresultStatus(res) != status)
1167  pg_fatal("%s reported status %s, expected %s\n"
1168  "Error message: \"%s\"",
1170  PQresStatus(status), PQerrorMessage(conn));
1171 
1172  if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1173  pg_fatal("%s expected command tag '%s', got '%s'",
1174  description, cmdtag, PQcmdStatus(res));
1175 
1176  pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1177 
1178  PQclear(res);
1179  }
1180  }
1181 
1182  /* Write more rows and/or the end pipeline message, if needed */
1183  if (FD_ISSET(sock, &output_mask))
1184  {
1185  PQflush(conn);
1186 
1187  if (send_step == BI_INSERT_ROWS)
1188  {
1189  snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1190  /* use up some buffer space with a wide value */
1191  snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1192 
1193  if (PQsendQueryPrepared(conn, "my_insert",
1194  2, insert_params, NULL, NULL, 0) == 1)
1195  {
1196  pg_debug("sent row %d\n", rows_to_send);
1197 
1198  rows_to_send--;
1199  if (rows_to_send == 0)
1200  send_step++;
1201  }
1202  else
1203  {
1204  /*
1205  * in nonblocking mode, so it's OK for an insert to fail
1206  * to send
1207  */
1208  fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1209  rows_to_send, PQerrorMessage(conn));
1210  }
1211  }
1212  else if (send_step == BI_COMMIT_TX)
1213  {
1214  if (PQsendQueryParams(conn, "COMMIT",
1215  0, NULL, NULL, NULL, NULL, 0) == 1)
1216  {
1217  pg_debug("sent COMMIT\n");
1218  send_step++;
1219  }
1220  else
1221  {
1222  fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223  PQerrorMessage(conn));
1224  }
1225  }
1226  else if (send_step == BI_SYNC)
1227  {
1228  if (PQpipelineSync(conn) == 1)
1229  {
1230  fprintf(stdout, "pipeline sync sent\n");
1231  send_step++;
1232  }
1233  else
1234  {
1235  fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236  PQerrorMessage(conn));
1237  }
1238  }
1239  }
1240  }
1241 
1242  /* We've got the sync message and the pipeline should be done */
1243  if (PQexitPipelineMode(conn) != 1)
1244  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245  PQerrorMessage(conn));
1246 
1247  if (PQsetnonblocking(conn, 0) != 0)
1248  pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1249 
1250  fprintf(stderr, "ok\n");
1251 }
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1536
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
#define MAXINT8LEN
static const char *const insert_sql2
#define MAXINTLEN
PipelineInsertStep
#define snprintf
Definition: port.h:238
const char * description

References Assert(), BI_BEGIN_TX, BI_COMMIT_TX, BI_CREATE_TABLE, BI_DONE, BI_DROP_TABLE, BI_INSERT_ROWS, BI_PREPARE, BI_SYNC, conn, create_table_sql, description, drop_table_sql, exit_nicely(), fprintf(), insert_sql2, MAXINT8LEN, MAXINTLEN, pg_debug, pg_fatal, PGRES_COMMAND_OK, PGRES_PIPELINE_SYNC, PQclear(), PQcmdStatus(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQflush(), PQgetResult(), PQisBusy(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), res, select, snprintf, and generate_unaccent_rules::stdout.

Referenced by main().

◆ test_prepared()

static void test_prepared ( PGconn conn)
static

Definition at line 1254 of file libpq_pipeline.c.

1255 {
1256  PGresult *res = NULL;
1257  Oid param_oids[1] = {INT4OID};
1258  Oid expected_oids[4];
1259  Oid typ;
1260 
1261  fprintf(stderr, "prepared... ");
1262 
1263  if (PQenterPipelineMode(conn) != 1)
1264  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1265  if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1266  "interval '1 sec'",
1267  1, param_oids) != 1)
1268  pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1269  expected_oids[0] = INT4OID;
1270  expected_oids[1] = TEXTOID;
1271  expected_oids[2] = NUMERICOID;
1272  expected_oids[3] = INTERVALOID;
1273  if (PQsendDescribePrepared(conn, "select_one") != 1)
1274  pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1275  if (PQpipelineSync(conn) != 1)
1276  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1277 
1278  res = PQgetResult(conn);
1279  if (res == NULL)
1280  pg_fatal("PQgetResult returned null");
1282  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1283  PQclear(res);
1284  res = PQgetResult(conn);
1285  if (res != NULL)
1286  pg_fatal("expected NULL result");
1287 
1288  res = PQgetResult(conn);
1289  if (res == NULL)
1290  pg_fatal("PQgetResult returned NULL");
1292  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1293  if (PQnfields(res) != lengthof(expected_oids))
1294  pg_fatal("expected %zu columns, got %d",
1295  lengthof(expected_oids), PQnfields(res));
1296  for (int i = 0; i < PQnfields(res); i++)
1297  {
1298  typ = PQftype(res, i);
1299  if (typ != expected_oids[i])
1300  pg_fatal("field %d: expected type %u, got %u",
1301  i, expected_oids[i], typ);
1302  }
1303  PQclear(res);
1304  res = PQgetResult(conn);
1305  if (res != NULL)
1306  pg_fatal("expected NULL result");
1307 
1308  res = PQgetResult(conn);
1310  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1311 
1312  fprintf(stderr, "closing statement..");
1313  if (PQsendClosePrepared(conn, "select_one") != 1)
1314  pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1315  if (PQpipelineSync(conn) != 1)
1316  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1317 
1318  res = PQgetResult(conn);
1319  if (res == NULL)
1320  pg_fatal("expected non-NULL result");
1322  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1323  PQclear(res);
1324  res = PQgetResult(conn);
1325  if (res != NULL)
1326  pg_fatal("expected NULL result");
1327  res = PQgetResult(conn);
1329  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1330 
1331  if (PQexitPipelineMode(conn) != 1)
1332  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1333 
1334  /* Now that it's closed we should get an error when describing */
1335  res = PQdescribePrepared(conn, "select_one");
1337  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1338 
1339  /*
1340  * Also test the blocking close, this should not fail since closing a
1341  * non-existent prepared statement is a no-op
1342  */
1343  res = PQclosePrepared(conn, "select_one");
1345  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1346 
1347  fprintf(stderr, "creating portal... ");
1348  PQexec(conn, "BEGIN");
1349  PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1351  if (PQsendDescribePortal(conn, "cursor_one") != 1)
1352  pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1353  if (PQpipelineSync(conn) != 1)
1354  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1355  res = PQgetResult(conn);
1356  if (res == NULL)
1357  pg_fatal("PQgetResult returned null");
1359  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1360 
1361  typ = PQftype(res, 0);
1362  if (typ != INT4OID)
1363  pg_fatal("portal: expected type %u, got %u",
1364  INT4OID, typ);
1365  PQclear(res);
1366  res = PQgetResult(conn);
1367  if (res != NULL)
1368  pg_fatal("expected NULL result");
1369  res = PQgetResult(conn);
1371  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1372 
1373  fprintf(stderr, "closing portal... ");
1374  if (PQsendClosePortal(conn, "cursor_one") != 1)
1375  pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1376  if (PQpipelineSync(conn) != 1)
1377  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1378 
1379  res = PQgetResult(conn);
1380  if (res == NULL)
1381  pg_fatal("expected non-NULL result");
1383  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1384  PQclear(res);
1385  res = PQgetResult(conn);
1386  if (res != NULL)
1387  pg_fatal("expected NULL result");
1388  res = PQgetResult(conn);
1390  pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1391 
1392  if (PQexitPipelineMode(conn) != 1)
1393  pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1394 
1395  /* Now that it's closed we should get an error when describing */
1396  res = PQdescribePortal(conn, "cursor_one");
1398  pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1399 
1400  /*
1401  * Also test the blocking close, this should not fail since closing a
1402  * non-existent portal is a no-op
1403  */
1404  res = PQclosePortal(conn, "cursor_one");
1406  pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1407 
1408  fprintf(stderr, "ok\n");
1409 }
#define lengthof(array)
Definition: c.h:788
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2569
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2455
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2556
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2539
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2474
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2491
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2504
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2521
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489

References conn, fprintf(), i, lengthof, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_SYNC, PQclear(), PQclosePortal(), PQclosePrepared(), PQdescribePortal(), PQdescribePrepared(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQftype(), PQgetResult(), PQnfields(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendClosePortal(), PQsendClosePrepared(), PQsendDescribePortal(), PQsendDescribePrepared(), PQsendPrepare(), and res.

Referenced by main().

◆ test_simple_pipeline()

static void test_simple_pipeline ( PGconn conn)
static

Definition at line 1490 of file libpq_pipeline.c.

1491 {
1492  PGresult *res = NULL;
1493  const char *dummy_params[1] = {"1"};
1494  Oid dummy_param_oids[1] = {INT4OID};
1495 
1496  fprintf(stderr, "simple pipeline... ");
1497 
1498  /*
1499  * Enter pipeline mode and dispatch a set of operations, which we'll then
1500  * process the results of as they come in.
1501  *
1502  * For a simple case we should be able to do this without interim
1503  * processing of results since our output buffer will give us enough slush
1504  * to work with and we won't block on sending. So blocking mode is fine.
1505  */
1506  if (PQisnonblocking(conn))
1507  pg_fatal("Expected blocking connection mode");
1508 
1509  if (PQenterPipelineMode(conn) != 1)
1510  pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1511 
1512  if (PQsendQueryParams(conn, "SELECT $1",
1513  1, dummy_param_oids, dummy_params,
1514  NULL, NULL, 0) != 1)
1515  pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1516 
1517  if (PQexitPipelineMode(conn) != 0)
1518  pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1519 
1520  if (PQpipelineSync(conn) != 1)
1521  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1522 
1523  res = PQgetResult(conn);
1524  if (res == NULL)
1525  pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1526  PQerrorMessage(conn));
1527 
1529  pg_fatal("Unexpected result code %s from first pipeline item",
1531 
1532  PQclear(res);
1533  res = NULL;
1534 
1535  if (PQgetResult(conn) != NULL)
1536  pg_fatal("PQgetResult returned something extra after first query result.");
1537 
1538  /*
1539  * Even though we've processed the result there's still a sync to come and
1540  * we can't exit pipeline mode yet
1541  */
1542  if (PQexitPipelineMode(conn) != 0)
1543  pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1544 
1545  res = PQgetResult(conn);
1546  if (res == NULL)
1547  pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1548  PQerrorMessage(conn));
1549 
1551  pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1553 
1554  PQclear(res);
1555  res = NULL;
1556 
1557  if (PQgetResult(conn) != NULL)
1558  pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1560 
1561  /* We're still in pipeline mode... */
1563  pg_fatal("Fell out of pipeline mode somehow");
1564 
1565  /* ... until we end it, which we can safely do now */
1566  if (PQexitPipelineMode(conn) != 1)
1567  pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1568  PQerrorMessage(conn));
1569 
1571  pg_fatal("Exiting pipeline mode didn't seem to work");
1572 
1573  fprintf(stderr, "ok\n");
1574 }

References conn, fprintf(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQisnonblocking(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendQueryParams(), and res.

Referenced by main().

◆ test_singlerowmode()

static void test_singlerowmode ( PGconn conn)
static

Definition at line 1577 of file libpq_pipeline.c.

1578 {
1579  PGresult *res;
1580  int i;
1581  bool pipeline_ended = false;
1582 
1583  if (PQenterPipelineMode(conn) != 1)
1584  pg_fatal("failed to enter pipeline mode: %s",
1585  PQerrorMessage(conn));
1586 
1587  /* One series of three commands, using single-row mode for the first two. */
1588  for (i = 0; i < 3; i++)
1589  {
1590  char *param[1];
1591 
1592  param[0] = psprintf("%d", 44 + i);
1593 
1594  if (PQsendQueryParams(conn,
1595  "SELECT generate_series(42, $1)",
1596  1,
1597  NULL,
1598  (const char **) param,
1599  NULL,
1600  NULL,
1601  0) != 1)
1602  pg_fatal("failed to send query: %s",
1603  PQerrorMessage(conn));
1604  pfree(param[0]);
1605  }
1606  if (PQpipelineSync(conn) != 1)
1607  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1608 
1609  for (i = 0; !pipeline_ended; i++)
1610  {
1611  bool first = true;
1612  bool saw_ending_tuplesok;
1613  bool isSingleTuple = false;
1614 
1615  /* Set single row mode for only first 2 SELECT queries */
1616  if (i < 2)
1617  {
1618  if (PQsetSingleRowMode(conn) != 1)
1619  pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1620  }
1621 
1622  /* Consume rows for this query */
1623  saw_ending_tuplesok = false;
1624  while ((res = PQgetResult(conn)) != NULL)
1625  {
1627 
1628  if (est == PGRES_PIPELINE_SYNC)
1629  {
1630  fprintf(stderr, "end of pipeline reached\n");
1631  pipeline_ended = true;
1632  PQclear(res);
1633  if (i != 3)
1634  pg_fatal("Expected three results, got %d", i);
1635  break;
1636  }
1637 
1638  /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1639  if (first)
1640  {
1641  if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1642  pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1643  i, PQresStatus(est));
1644  if (i >= 2 && est != PGRES_TUPLES_OK)
1645  pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1646  i, PQresStatus(est));
1647  first = false;
1648  }
1649 
1650  fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1651  switch (est)
1652  {
1653  case PGRES_TUPLES_OK:
1654  fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1655  saw_ending_tuplesok = true;
1656  if (isSingleTuple)
1657  {
1658  if (PQntuples(res) == 0)
1659  fprintf(stderr, "all tuples received in query %d\n", i);
1660  else
1661  pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1662  }
1663  break;
1664 
1665  case PGRES_SINGLE_TUPLE:
1666  isSingleTuple = true;
1667  fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1668  break;
1669 
1670  default:
1671  pg_fatal("unexpected");
1672  }
1673  PQclear(res);
1674  }
1675  if (!pipeline_ended && !saw_ending_tuplesok)
1676  pg_fatal("didn't get expected terminating TUPLES_OK");
1677  }
1678 
1679  /*
1680  * Now issue one command, get its results in with single-row mode, then
1681  * issue another command, and get its results in normal mode; make sure
1682  * the single-row mode flag is reset as expected.
1683  */
1684  if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1685  0, NULL, NULL, NULL, NULL, 0) != 1)
1686  pg_fatal("failed to send query: %s",
1687  PQerrorMessage(conn));
1688  if (PQsendFlushRequest(conn) != 1)
1689  pg_fatal("failed to send flush request");
1690  if (PQsetSingleRowMode(conn) != 1)
1691  pg_fatal("PQsetSingleRowMode() failed");
1692  res = PQgetResult(conn);
1693  if (res == NULL)
1694  pg_fatal("unexpected NULL");
1696  pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1698  res = PQgetResult(conn);
1699  if (res == NULL)
1700  pg_fatal("unexpected NULL");
1702  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1704  if (PQgetResult(conn) != NULL)
1705  pg_fatal("expected NULL result");
1706 
1707  if (PQsendQueryParams(conn, "SELECT 1",
1708  0, NULL, NULL, NULL, NULL, 0) != 1)
1709  pg_fatal("failed to send query: %s",
1710  PQerrorMessage(conn));
1711  if (PQsendFlushRequest(conn) != 1)
1712  pg_fatal("failed to send flush request");
1713  res = PQgetResult(conn);
1714  if (res == NULL)
1715  pg_fatal("unexpected NULL");
1717  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1719  if (PQgetResult(conn) != NULL)
1720  pg_fatal("expected NULL result");
1721 
1722  /*
1723  * Try chunked mode as well; make sure that it correctly delivers a
1724  * partial final chunk.
1725  */
1726  if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1727  0, NULL, NULL, NULL, NULL, 0) != 1)
1728  pg_fatal("failed to send query: %s",
1729  PQerrorMessage(conn));
1730  if (PQsendFlushRequest(conn) != 1)
1731  pg_fatal("failed to send flush request");
1732  if (PQsetChunkedRowsMode(conn, 3) != 1)
1733  pg_fatal("PQsetChunkedRowsMode() failed");
1734  res = PQgetResult(conn);
1735  if (res == NULL)
1736  pg_fatal("unexpected NULL");
1738  pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1740  PQerrorMessage(conn));
1741  if (PQntuples(res) != 3)
1742  pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1743  res = PQgetResult(conn);
1744  if (res == NULL)
1745  pg_fatal("unexpected NULL");
1747  pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1749  if (PQntuples(res) != 2)
1750  pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1751  res = PQgetResult(conn);
1752  if (res == NULL)
1753  pg_fatal("unexpected NULL");
1755  pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1757  if (PQntuples(res) != 0)
1758  pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1759  if (PQgetResult(conn) != NULL)
1760  pg_fatal("expected NULL result");
1761 
1762  if (PQexitPipelineMode(conn) != 1)
1763  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1764 
1765  fprintf(stderr, "ok\n");
1766 }
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1965
ExecStatusType
Definition: libpq-fe.h:98
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:117
void pfree(void *pointer)
Definition: mcxt.c:1520
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46

References conn, fprintf(), i, pfree(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_CHUNK, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsetChunkedRowsMode(), PQsetSingleRowMode(), psprintf(), and res.

Referenced by main().

◆ test_transaction()

static void test_transaction ( PGconn conn)
static

Definition at line 1773 of file libpq_pipeline.c.

1774 {
1775  PGresult *res;
1776  bool expect_null;
1777  int num_syncs = 0;
1778 
1779  res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1780  "CREATE TABLE pq_pipeline_tst (id int)");
1782  pg_fatal("failed to create test table: %s",
1783  PQerrorMessage(conn));
1784  PQclear(res);
1785 
1786  if (PQenterPipelineMode(conn) != 1)
1787  pg_fatal("failed to enter pipeline mode: %s",
1788  PQerrorMessage(conn));
1789  if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1790  pg_fatal("could not send prepare on pipeline: %s",
1791  PQerrorMessage(conn));
1792 
1793  if (PQsendQueryParams(conn,
1794  "BEGIN",
1795  0, NULL, NULL, NULL, NULL, 0) != 1)
1796  pg_fatal("failed to send query: %s",
1797  PQerrorMessage(conn));
1798  if (PQsendQueryParams(conn,
1799  "SELECT 0/0",
1800  0, NULL, NULL, NULL, NULL, 0) != 1)
1801  pg_fatal("failed to send query: %s",
1802  PQerrorMessage(conn));
1803 
1804  /*
1805  * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1806  * get out of the pipeline-aborted state first.
1807  */
1808  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1809  pg_fatal("failed to execute prepared: %s",
1810  PQerrorMessage(conn));
1811 
1812  /* This insert fails because we're in pipeline-aborted state */
1813  if (PQsendQueryParams(conn,
1814  "INSERT INTO pq_pipeline_tst VALUES (1)",
1815  0, NULL, NULL, NULL, NULL, 0) != 1)
1816  pg_fatal("failed to send query: %s",
1817  PQerrorMessage(conn));
1818  if (PQpipelineSync(conn) != 1)
1819  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1820  num_syncs++;
1821 
1822  /*
1823  * This insert fails even though the pipeline got a SYNC, because we're in
1824  * an aborted transaction
1825  */
1826  if (PQsendQueryParams(conn,
1827  "INSERT INTO pq_pipeline_tst VALUES (2)",
1828  0, NULL, NULL, NULL, NULL, 0) != 1)
1829  pg_fatal("failed to send query: %s",
1830  PQerrorMessage(conn));
1831  if (PQpipelineSync(conn) != 1)
1832  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1833  num_syncs++;
1834 
1835  /*
1836  * Send ROLLBACK using prepared stmt. This one works because we just did
1837  * PQpipelineSync above.
1838  */
1839  if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1840  pg_fatal("failed to execute prepared: %s",
1841  PQerrorMessage(conn));
1842 
1843  /*
1844  * Now that we're out of a transaction and in pipeline-good mode, this
1845  * insert works
1846  */
1847  if (PQsendQueryParams(conn,
1848  "INSERT INTO pq_pipeline_tst VALUES (3)",
1849  0, NULL, NULL, NULL, NULL, 0) != 1)
1850  pg_fatal("failed to send query: %s",
1851  PQerrorMessage(conn));
1852  /* Send two syncs now -- match up to SYNC messages below */
1853  if (PQpipelineSync(conn) != 1)
1854  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1855  num_syncs++;
1856  if (PQpipelineSync(conn) != 1)
1857  pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1858  num_syncs++;
1859 
1860  expect_null = false;
1861  for (int i = 0;; i++)
1862  {
1863  ExecStatusType restype;
1864 
1865  res = PQgetResult(conn);
1866  if (res == NULL)
1867  {
1868  printf("%d: got NULL result\n", i);
1869  if (!expect_null)
1870  pg_fatal("did not expect NULL here");
1871  expect_null = false;
1872  continue;
1873  }
1874  restype = PQresultStatus(res);
1875  printf("%d: got status %s", i, PQresStatus(restype));
1876  if (expect_null)
1877  pg_fatal("expected NULL");
1878  if (restype == PGRES_FATAL_ERROR)
1879  printf("; error: %s", PQerrorMessage(conn));
1880  else if (restype == PGRES_PIPELINE_ABORTED)
1881  {
1882  printf(": command didn't run because pipeline aborted\n");
1883  }
1884  else
1885  printf("\n");
1886  PQclear(res);
1887 
1888  if (restype == PGRES_PIPELINE_SYNC)
1889  num_syncs--;
1890  else
1891  expect_null = true;
1892  if (num_syncs <= 0)
1893  break;
1894  }
1895  if (PQgetResult(conn) != NULL)
1896  pg_fatal("returned something extra after all the syncs: %s",
1898 
1899  if (PQexitPipelineMode(conn) != 1)
1900  pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1901 
1902  /* We expect to find one tuple containing the value "3" */
1903  res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1905  pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1906  if (PQntuples(res) != 1)
1907  pg_fatal("did not get 1 tuple");
1908  if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1909  pg_fatal("did not get expected tuple");
1910  PQclear(res);
1911 
1912  fprintf(stderr, "ok\n");
1913 }

References conn, fprintf(), i, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), printf, and res.

Referenced by main().

◆ test_uniqviol()

static void test_uniqviol ( PGconn conn)
static

Definition at line 1921 of file libpq_pipeline.c.

1922 {
1923  int sock = PQsocket(conn);
1924  PGresult *res;
1925  Oid paramTypes[2] = {INT8OID, INT8OID};
1926  const char *paramValues[2];
1927  char paramValue0[MAXINT8LEN];
1928  char paramValue1[MAXINT8LEN];
1929  int ctr = 0;
1930  int numsent = 0;
1931  int results = 0;
1932  bool read_done = false;
1933  bool write_done = false;
1934  bool error_sent = false;
1935  bool got_error = false;
1936  int switched = 0;
1937  int socketful = 0;
1938  fd_set in_fds;
1939  fd_set out_fds;
1940 
1941  fprintf(stderr, "uniqviol ...");
1942 
1943  PQsetnonblocking(conn, 1);
1944 
1945  paramValues[0] = paramValue0;
1946  paramValues[1] = paramValue1;
1947  sprintf(paramValue1, "42");
1948 
1949  res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1950  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1952  pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1953 
1954  res = PQexec(conn, "begin");
1956  pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1957 
1958  res = PQprepare(conn, "insertion",
1959  "insert into ppln_uniqviol values ($1, $2) returning id",
1960  2, paramTypes);
1961  if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1962  pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1963 
1964  if (PQenterPipelineMode(conn) != 1)
1965  pg_fatal("failed to enter pipeline mode");
1966 
1967  while (!read_done)
1968  {
1969  /*
1970  * Avoid deadlocks by reading everything the server has sent before
1971  * sending anything. (Special precaution is needed here to process
1972  * PQisBusy before testing the socket for read-readiness, because the
1973  * socket does not turn read-ready after "sending" queries in aborted
1974  * pipeline mode.)
1975  */
1976  while (PQisBusy(conn) == 0)
1977  {
1978  bool new_error;
1979 
1980  if (results >= numsent)
1981  {
1982  if (write_done)
1983  read_done = true;
1984  break;
1985  }
1986 
1987  res = PQgetResult(conn);
1988  new_error = process_result(conn, res, results, numsent);
1989  if (new_error && got_error)
1990  pg_fatal("got two errors");
1991  got_error |= new_error;
1992  if (results++ >= numsent - 1)
1993  {
1994  if (write_done)
1995  read_done = true;
1996  break;
1997  }
1998  }
1999 
2000  if (read_done)
2001  break;
2002 
2003  FD_ZERO(&out_fds);
2004  FD_SET(sock, &out_fds);
2005 
2006  FD_ZERO(&in_fds);
2007  FD_SET(sock, &in_fds);
2008 
2009  if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2010  {
2011  if (errno == EINTR)
2012  continue;
2013  pg_fatal("select() failed: %m");
2014  }
2015 
2016  if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2017  pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2018 
2019  /*
2020  * If the socket is writable and we haven't finished sending queries,
2021  * send some.
2022  */
2023  if (!write_done && FD_ISSET(sock, &out_fds))
2024  {
2025  for (;;)
2026  {
2027  int flush;
2028 
2029  /*
2030  * provoke uniqueness violation exactly once after having
2031  * switched to read mode.
2032  */
2033  if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2034  {
2035  sprintf(paramValue0, "%d", numsent / 2);
2036  fprintf(stderr, "E");
2037  error_sent = true;
2038  }
2039  else
2040  {
2041  fprintf(stderr, ".");
2042  sprintf(paramValue0, "%d", ctr++);
2043  }
2044 
2045  if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2046  pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2047  numsent++;
2048 
2049  /* Are we done writing? */
2050  if (socketful != 0 && numsent % socketful == 42 && error_sent)
2051  {
2052  if (PQsendFlushRequest(conn) != 1)
2053  pg_fatal("failed to send flush request");
2054  write_done = true;
2055  fprintf(stderr, "\ndone writing\n");
2056  PQflush(conn);
2057  break;
2058  }
2059 
2060  /* is the outgoing socket full? */
2061  flush = PQflush(conn);
2062  if (flush == -1)
2063  pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2064  if (flush == 1)
2065  {
2066  if (socketful == 0)
2067  socketful = numsent;
2068  fprintf(stderr, "\nswitch to reading\n");
2069  switched++;
2070  break;
2071  }
2072  }
2073  }
2074  }
2075 
2076  if (!got_error)
2077  pg_fatal("did not get expected error");
2078 
2079  fprintf(stderr, "ok\n");
2080 }
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
#define sprintf
Definition: port.h:240

References conn, EINTR, fprintf(), MAXINT8LEN, pg_fatal, PGRES_COMMAND_OK, PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQflush(), PQgetResult(), PQisBusy(), PQprepare(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), process_result(), res, select, and sprintf.

Referenced by main().

◆ usage()

static void usage ( const char *  progname)
static

Definition at line 2137 of file libpq_pipeline.c.

2138 {
2139  fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2140  fprintf(stderr, "Usage:\n");
2141  fprintf(stderr, " %s [OPTION] tests\n", progname);
2142  fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2143  fprintf(stderr, "\nOptions:\n");
2144  fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2145  fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2146 }
const char *const progname

References fprintf(), and progname.

Referenced by main().

◆ va_end()

◆ va_start()

◆ vfprintf()

vfprintf ( stderr  ,
fmt  ,
args   
)

◆ wait_for_connection_state()

static void wait_for_connection_state ( int  line,
PGconn monitorConn,
int  procpid,
char *  state,
char *  event 
)
static

Definition at line 123 of file libpq_pipeline.c.

125 {
126  const Oid paramTypes[] = {INT4OID, TEXTOID};
127  const char *paramValues[2];
128  char *pidstr = psprintf("%d", procpid);
129 
130  Assert((state == NULL) ^ (event == NULL));
131 
132  paramValues[0] = pidstr;
133  paramValues[1] = state ? state : event;
134 
135  while (true)
136  {
137  PGresult *res;
138  char *value;
139 
140  if (state != NULL)
141  res = PQexecParams(monitorConn,
142  "SELECT count(*) FROM pg_stat_activity WHERE "
143  "pid = $1 AND state = $2",
144  2, paramTypes, paramValues, NULL, NULL, 0);
145  else
146  res = PQexecParams(monitorConn,
147  "SELECT count(*) FROM pg_stat_activity WHERE "
148  "pid = $1 AND wait_event = $2",
149  2, paramTypes, paramValues, NULL, NULL, 0);
150 
152  pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
153  if (PQntuples(res) != 1)
154  pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
155  if (PQnfields(res) != 1)
156  pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
157  value = PQgetvalue(res, 0, 0);
158  if (strcmp(value, "0") != 0)
159  {
160  PQclear(res);
161  break;
162  }
163  PQclear(res);
164 
165  /* wait 10ms before polling again */
166  pg_usleep(10000);
167  }
168 
169  pfree(pidstr);
170 }
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
static struct @155 value
void pg_usleep(long microsec)
Definition: signal.c:53
Definition: regguts.h:323

References Assert(), pfree(), pg_usleep(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexecParams(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), psprintf(), res, and value.

Referenced by send_cancellable_query_impl().

Variable Documentation

◆ create_table_sql

const char* const create_table_sql
static
Initial value:
=
"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
"int8filler int8);"

Definition at line 48 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ drop_table_sql

const char* const drop_table_sql
static
Initial value:
=
"DROP TABLE IF EXISTS pq_pipeline_demo"

Definition at line 46 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ fmt

static void const char * fmt

Definition at line 29 of file libpq_pipeline.c.

Referenced by _allocAH(), ahprintf(), appendJSONKeyValueFmt(), appendPQExpBuffer(), appendPQExpBufferVA(), appendStringInfo(), appendStringInfoVA(), archprintf(), bail_out(), CreateArchive(), date_test_defmt(), date_test_fmt(), datetime_to_char_body(), dblink_res_error(), do_pset(), do_serialize(), do_to_timestamp(), dttofmtasc_replace(), ecpg_log(), emit_tap_output(), emit_tap_output_v(), err(), errmsg(), errmsg_internal(), errx(), exec_prog(), executeQueryOrDie(), find_end_token(), float4_to_char(), float8_to_char(), fmtfloat(), fmtlong(), format_elog_string(), int4_to_char(), int8_to_char(), interval_to_char(), libpq_append_conn_error(), libpq_append_error(), log_error(), main(), manifest_report_error(), my_strftime(), numeric_to_char(), numeric_to_number(), OpenArchive(), parallel_exec_prog(), parse_datetime(), pg_fatal(), pg_fprintf(), pg_log(), pg_log_filter_error(), pg_log_generic(), pg_log_generic_v(), pg_log_v(), pg_printf(), pg_snprintf(), pg_sprintf(), pg_strfromd(), pg_vfprintf(), pg_vprintf(), pg_vsnprintf(), pg_vsprintf(), PGTYPESdate_defmt_asc(), PGTYPEStimestamp_defmt_asc(), PGTYPEStimestamp_defmt_scan(), PLy_elog_impl(), PLy_exception_set(), pqInternalNotice(), prep_status(), prep_status_progress(), print_lo_result(), printfPQExpBuffer(), ProcessCopyOptions(), psprintf(), pvsnprintf(), px_debug(), rdefmtdate(), ReadHead(), report_backup_error(), report_fatal_error(), report_invalid_record(), report_manifest_error(), report_status(), ReportWalSummaryError(), rfmtdate(), rfmtlong(), tarPrintf(), text_format(), timestamp_to_char(), timestamptz_to_char(), to_date(), to_timestamp(), walsummary_error_callback(), warn_or_exit_horribly(), and write_stderr().

◆ insert_sql

const char* const insert_sql
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)"

Definition at line 51 of file libpq_pipeline.c.

Referenced by test_pipeline_abort().

◆ insert_sql2

const char* const insert_sql2
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)"

Definition at line 53 of file libpq_pipeline.c.

Referenced by test_pipelined_insert().

◆ progname

const char* const progname = "libpq_pipeline"

Definition at line 34 of file libpq_pipeline.c.

Referenced by usage().

◆ tracefile

char* tracefile = NULL

Definition at line 37 of file libpq_pipeline.c.

Referenced by main().