PostgreSQL Source Code git master
libpq_pipeline.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/time.h>
#include "catalog/pg_type_d.h"
#include "libpq-fe.h"
#include "pg_getopt.h"
Include dependency graph for libpq_pipeline.c:

Go to the source code of this file.

Macros

#define pg_debug(...)
 
#define MAXINTLEN   12
 
#define MAXINT8LEN   20
 
#define pg_fatal(...)   pg_fatal_impl(__LINE__, __VA_ARGS__)
 
#define confirm_query_canceled(conn)   confirm_query_canceled_impl(__LINE__, conn)
 
#define send_cancellable_query(conn, monitorConn)    send_cancellable_query_impl(__LINE__, conn, monitorConn)
 

Enumerations

enum  PipelineInsertStep {
  BI_BEGIN_TX , BI_DROP_TABLE , BI_CREATE_TABLE , BI_PREPARE ,
  BI_INSERT_ROWS , BI_COMMIT_TX , BI_SYNC , BI_DONE
}
 

Functions

static void exit_nicely (PGconn *conn)
 
static void pg_attribute_noreturn () pg_fatal_impl(int line
 
static void const char pg_attribute_printf (2, 3)
 
static bool process_result (PGconn *conn, PGresult *res, int results, int numsent)
 
static void const char fflush (stdout)
 
 fprintf (stderr, "\n%s:%d: ", progname, line)
 
 va_start (args, fmt)
 
 vfprintf (stderr, fmt, args)
 
 va_end (args)
 
 Assert (fmt[strlen(fmt) - 1] !='\n')
 
 fprintf (stderr, "\n")
 
 exit (1)
 
static void confirm_query_canceled_impl (int line, PGconn *conn)
 
static void wait_for_connection_state (int line, PGconn *monitorConn, int procpid, char *state, char *event)
 
static void send_cancellable_query_impl (int line, PGconn *conn, PGconn *monitorConn)
 
static PGconncopy_connection (PGconn *conn)
 
static void test_cancel (PGconn *conn)
 
static void test_disallowed_in_pipeline (PGconn *conn)
 
static void test_multi_pipelines (PGconn *conn)
 
static void test_nosync (PGconn *conn)
 
static void test_pipeline_abort (PGconn *conn)
 
static void test_pipelined_insert (PGconn *conn, int n_rows)
 
static void test_prepared (PGconn *conn)
 
static void notice_processor (void *arg, const char *message)
 
static void test_pipeline_idle (PGconn *conn)
 
static void test_simple_pipeline (PGconn *conn)
 
static void test_singlerowmode (PGconn *conn)
 
static void test_transaction (PGconn *conn)
 
static void test_uniqviol (PGconn *conn)
 
static void usage (const char *progname)
 
static void print_test_list (void)
 
int main (int argc, char **argv)
 

Variables

static void const char * fmt
 
static const char *const progname = "libpq_pipeline"
 
static char * tracefile = NULL
 
static const char *const drop_table_sql
 
static const char *const create_table_sql
 
static const char *const insert_sql
 
static const char *const insert_sql2
 

Macro Definition Documentation

◆ confirm_query_canceled

#define confirm_query_canceled (   conn)    confirm_query_canceled_impl(__LINE__, conn)

Definition at line 94 of file libpq_pipeline.c.

◆ MAXINT8LEN

#define MAXINT8LEN   20

Definition at line 56 of file libpq_pipeline.c.

◆ MAXINTLEN

#define MAXINTLEN   12

Definition at line 55 of file libpq_pipeline.c.

◆ pg_debug

#define pg_debug (   ...)

Definition at line 41 of file libpq_pipeline.c.

◆ pg_fatal

#define pg_fatal (   ...)    pg_fatal_impl(__LINE__, __VA_ARGS__)

Definition at line 73 of file libpq_pipeline.c.

◆ send_cancellable_query

#define send_cancellable_query (   conn,
  monitorConn 
)     send_cancellable_query_impl(__LINE__, conn, monitorConn)

Definition at line 170 of file libpq_pipeline.c.

Enumeration Type Documentation

◆ PipelineInsertStep

Enumerator
BI_BEGIN_TX 
BI_DROP_TABLE 
BI_CREATE_TABLE 
BI_PREPARE 
BI_INSERT_ROWS 
BI_COMMIT_TX 
BI_SYNC 
BI_DONE 

Definition at line 993 of file libpq_pipeline.c.

994{
1001 BI_SYNC,
1002 BI_DONE,
1003};
@ BI_INSERT_ROWS
@ BI_BEGIN_TX
@ BI_CREATE_TABLE
@ BI_PREPARE
@ BI_DROP_TABLE
@ BI_SYNC
@ BI_DONE
@ BI_COMMIT_TX

Function Documentation

◆ Assert()

Assert ( fmt [strlen(fmt) - 1] = '\n')

◆ confirm_query_canceled_impl()

static void confirm_query_canceled_impl ( int  line,
PGconn conn 
)
static

Definition at line 96 of file libpq_pipeline.c.

97{
98 PGresult *res = NULL;
99
101 if (res == NULL)
102 pg_fatal_impl(line, "PQgetResult returned null: %s",
105 pg_fatal_impl(line, "query did not fail when it was expected");
106 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
107 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
109 PQclear(res);
110
111 while (PQisBusy(conn))
113}
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7507
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
@ PGRES_FATAL_ERROR
Definition: libpq-fe.h:133
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
PGconn * conn
Definition: streamutil.c:53

References conn, PG_DIAG_SQLSTATE, PGRES_FATAL_ERROR, PQclear(), PQconsumeInput(), PQerrorMessage(), PQgetResult(), PQisBusy(), PQresultErrorField(), PQresultStatus(), and res.

◆ copy_connection()

static PGconn * copy_connection ( PGconn conn)
static

Definition at line 204 of file libpq_pipeline.c.

205{
206 PGconn *copyConn;
208 const char **keywords;
209 const char **vals;
210 int nopts = 1;
211 int i = 0;
212
213 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
214 nopts++;
215
216 keywords = pg_malloc(sizeof(char *) * nopts);
217 vals = pg_malloc(sizeof(char *) * nopts);
218
219 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
220 {
221 if (opt->val)
222 {
223 keywords[i] = opt->keyword;
224 vals[i] = opt->val;
225 i++;
226 }
227 }
228 keywords[i] = vals[i] = NULL;
229
230 copyConn = PQconnectdbParams(keywords, vals, false);
231
232 if (PQstatus(copyConn) != CONNECTION_OK)
233 pg_fatal("Connection to database failed: %s",
234 PQerrorMessage(copyConn));
235
236 return copyConn;
237}
PQconninfoOption * PQconninfo(PGconn *conn)
Definition: fe-connect.c:7278
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7444
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:717
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int i
Definition: isn.c:72
static const JsonPathKeyword keywords[]
@ CONNECTION_OK
Definition: libpq-fe.h:81
#define pg_fatal(...)
static AmcheckOptions opts
Definition: pg_amcheck.c:112
const char * keyword

References conn, CONNECTION_OK, i, JsonPathKeyword::keyword, keywords, opts, pg_fatal, pg_malloc(), PQconnectdbParams(), PQconninfo(), PQerrorMessage(), and PQstatus().

Referenced by test_cancel().

◆ exit()

exit ( )

Referenced by _check_database_version(), add_socket_to_set(), add_typedefs_from_file(), addtype(), adjleap(), adjust_data_dir(), append_database_pattern(), append_relation_pattern_helper(), append_schema_pattern(), appendPsqlMetaConnect(), appendQualifiedRelation(), appendShellString(), associate(), astreamer_zstd_compressor_new(), bail_out(), BaseBackup(), bootstrap_template1(), change_directory(), check_input(), check_locale_name(), check_ok(), check_prepare_conn(), check_publisher(), check_root(), check_subscriber(), check_testspec(), CheckConnection(), CheckDataVersion(), checkInitSteps(), close_cur1(), close_file(), cluster_one_database(), compile_database_list(), compile_relation_list_one_db(), connect_database(), connect_slot(), connectToServer(), create_data_directory(), create_xlog_or_symlink(), CreateBackupStreamer(), die_on_query_failure(), digestControlFile(), disconnect_database(), do_init(), do_kill(), do_logrotate(), do_promote(), do_reload(), do_restart(), do_start(), do_status(), do_stop(), dolink(), ecpg_filter_source(), ecpg_filter_stderr(), ecpg_start_test(), enlargeStringInfo(), ensureCleanShutdown(), err(), error(), errstart(), errx(), executeCommand(), executeQuery(), executeQueryOrDie(), executeStatement(), exit_nicely(), find_other_exec_or_die(), findBuiltin(), FindStreamingStart(), get_control_dbstate(), get_id(), get_opts(), get_pgpid(), get_record1(), get_restricted_token(), get_su_pwd(), get_table_relkind(), get_user_name_or_exit(), get_var1(), GetConnection(), getfields(), GetTableInfo(), GucInfoMain(), handle_args(), handle_help_version_opts(), HandleStartupProcInterrupts(), infile(), Initialize(), isolation_init(), isolation_start_test(), leapadd(), libpqsrv_cancel(), main(), memory_exhausted(), mkdirs(), mmfatal(), newabbr(), open_cur1(), open_walfile(), ParallelBackupStart(), parse_psql_options(), parseCommandLine(), parseServiceFile(), pg_ctl_status(), pg_fatal(), pg_log_v(), pg_malloc_internal(), pg_realloc(), pg_strdup(), pg_wcsformat(), pgfdw_get_cleanup_result(), pgwin32_is_admin(), pnstrdup(), postprocess_sql_command(), PQprint(), printTable(), printTableAddCell(), printTableAddHeader(), printTableInit(), proc_exit(), process_backslash_command(), process_directory_recursively(), psql_scan(), psql_scan_slash_option(), psql_start_test(), pvsnprintf(), read_controlfile(), read_dumpall_filters(), read_post_opts(), regression_main(), reindex_one_database(), replace_percent_placeholders(), report_backup_error(), report_clusters_compatible(), report_fatal_error(), report_manifest_error(), rewind_parseTimeLineHistory(), rpytime(), rulesub(), run_command(), run_permutation(), run_reindex_command(), runInitSteps(), s_lock_stuck(), save_ps_display_args(), scan_for_existing_tablespaces(), search_directory(), set_mode(), set_option(), set_sig(), setup_data_file_paths(), setup_locale_encoding(), setup_pgdata(), setup_publisher(), SetWALFileNameForCleanup(), spawn_process(), spec_yyerror(), sql_check(), sql_conn(), sql_exec(), start_postmaster(), StartLogStreamer(), startup_hacks(), StreamLog(), syntax_error(), test_convert(), test_one_vector(), test_timing(), threadRun(), time_overflow(), TransferPredicateLocksToNewTarget(), try_complete_step(), usage(), vacuum_delay_point(), vacuum_one_database(), WalRcvWaitForStartPosition(), walsummary_error_callback(), and writezone().

◆ exit_nicely()

static void exit_nicely ( PGconn conn)
static

Definition at line 59 of file libpq_pipeline.c.

60{
62 exit(1);
63}
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5178
exit(1)

References conn, exit(), and PQfinish().

Referenced by main(), test_nosync(), and test_pipelined_insert().

◆ fflush()

static void const char fflush ( stdout  )

Referenced by adjust_data_dir(), bootstrap_template1(), check_ok(), cluster_all_databases(), create_data_directory(), create_xlog_or_symlink(), do_copy(), do_init(), do_pg_backup_stop(), do_shell(), do_watch(), dumpnfa(), echo_hidden_command(), ecpg_log(), editFile(), emit_tap_output_v(), ensureCleanShutdown(), errfinish(), evaluate_backtick(), ExceptionalCondition(), exec_command_print(), exec_command_prompt(), exec_command_write(), exec_prog(), ExecQueryAndProcessResults(), ExecQueryTuples(), ExecuteRecoveryCommand(), fork_process(), get_bin_version(), get_control_data(), get_prompt(), get_su_pwd(), gets_interactive(), handleCopyIn(), handleCopyOut(), HandleSlashCmds(), initialize_data_directory(), InteractiveBackend(), log_pre_callback(), main(), MainLoop(), OpenPipeStream(), openQueryOutputFile(), PageOutput(), parallel_exec_prog(), parallel_transfer_all_new_dbs(), ParallelBackupStart(), parse_required_wal(), perform_spin_delay(), pg_log_generic_v(), pg_log_v(), pg_regcomp(), pg_regexec(), pipe_read_line(), plpgsql_dumptree(), popen_check(), pprint(), PQdisplayTuples(), pqFlush(), PQprint(), PQuntrace(), print(), print_filemap(), print_msg(), PrintNotifications(), PrintQueryStatus(), PrintQueryTuples(), printVersion(), psql_end_command(), PSQLexec(), regression_main(), reindex_all_databases(), RestoreArchivedFile(), run_diff(), runPgDump(), runShellCommand(), SendQuery(), setup_config(), shell_archive_file(), simple_prompt_extended(), spawn_process(), start_postmaster(), stop_postmaster(), SysLogger_Start(), test_config_settings(), test_file_descriptor_sync(), test_non_sync(), test_open_sync(), test_specific_config_settings(), test_sync(), vacuum_one_database(), vacuumlo(), and write_stderr().

◆ fprintf() [1/2]

fprintf ( stderr  ,
"\n"   
)

◆ fprintf() [2/2]

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2164 of file libpq_pipeline.c.

2165{
2166 const char *conninfo = "";
2167 PGconn *conn;
2168 FILE *trace;
2169 char *testname;
2170 int numrows = 10000;
2171 PGresult *res;
2172 int c;
2173
2174 while ((c = getopt(argc, argv, "r:t:")) != -1)
2175 {
2176 switch (c)
2177 {
2178 case 'r': /* numrows */
2179 errno = 0;
2180 numrows = strtol(optarg, NULL, 10);
2181 if (errno != 0 || numrows <= 0)
2182 {
2183 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2184 optarg);
2185 exit(1);
2186 }
2187 break;
2188 case 't': /* trace file */
2190 break;
2191 }
2192 }
2193
2194 if (optind < argc)
2195 {
2196 testname = pg_strdup(argv[optind]);
2197 optind++;
2198 }
2199 else
2200 {
2201 usage(argv[0]);
2202 exit(1);
2203 }
2204
2205 if (strcmp(testname, "tests") == 0)
2206 {
2208 exit(0);
2209 }
2210
2211 if (optind < argc)
2212 {
2213 conninfo = pg_strdup(argv[optind]);
2214 optind++;
2215 }
2216
2217 /* Make a connection to the database */
2218 conn = PQconnectdb(conninfo);
2219 if (PQstatus(conn) != CONNECTION_OK)
2220 {
2221 fprintf(stderr, "Connection to database failed: %s\n",
2224 }
2225
2226 res = PQexec(conn, "SET lc_messages TO \"C\"");
2228 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2229 res = PQexec(conn, "SET debug_parallel_query = off");
2231 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2232
2233 /* Set the trace file, if requested */
2234 if (tracefile != NULL)
2235 {
2236 if (strcmp(tracefile, "-") == 0)
2237 trace = stdout;
2238 else
2239 trace = fopen(tracefile, "w");
2240 if (trace == NULL)
2241 pg_fatal("could not open file \"%s\": %m", tracefile);
2242
2243 /* Make it line-buffered */
2244 setvbuf(trace, NULL, PG_IOLBF, 0);
2245
2246 PQtrace(conn, trace);
2249 }
2250
2251 if (strcmp(testname, "cancel") == 0)
2253 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2255 else if (strcmp(testname, "multi_pipelines") == 0)
2257 else if (strcmp(testname, "nosync") == 0)
2259 else if (strcmp(testname, "pipeline_abort") == 0)
2261 else if (strcmp(testname, "pipeline_idle") == 0)
2263 else if (strcmp(testname, "pipelined_insert") == 0)
2264 test_pipelined_insert(conn, numrows);
2265 else if (strcmp(testname, "prepared") == 0)
2267 else if (strcmp(testname, "simple_pipeline") == 0)
2269 else if (strcmp(testname, "singlerow") == 0)
2271 else if (strcmp(testname, "transaction") == 0)
2273 else if (strcmp(testname, "uniqviol") == 0)
2275 else
2276 {
2277 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2278 exit(1);
2279 }
2280
2281 /* close the connection to the database and cleanup */
2282 PQfinish(conn);
2283 return 0;
2284}
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:772
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
void PQtrace(PGconn *conn, FILE *debug_port)
Definition: fe-trace.c:35
void PQsetTraceFlags(PGconn *conn, int flags)
Definition: fe-trace.c:64
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:122
#define PQTRACE_SUPPRESS_TIMESTAMPS
Definition: libpq-fe.h:469
#define PQTRACE_REGRESS_MODE
Definition: libpq-fe.h:471
static void usage(const char *progname)
static void print_test_list(void)
static void exit_nicely(PGconn *conn)
static void test_uniqviol(PGconn *conn)
static void test_simple_pipeline(PGconn *conn)
static char * tracefile
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static void test_nosync(PGconn *conn)
static void test_pipeline_abort(PGconn *conn)
static void test_transaction(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
fprintf(stderr, "\n%s:%d: ", progname, line)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
PGDLLIMPORT int optind
Definition: getopt.c:51
int getopt(int nargc, char *const *nargv, const char *ostr)
Definition: getopt.c:72
PGDLLIMPORT char * optarg
Definition: getopt.c:53
#define PG_IOLBF
Definition: port.h:389
char * c

References conn, CONNECTION_OK, exit(), exit_nicely(), fprintf(), getopt(), optarg, optind, pg_fatal, PG_IOLBF, pg_strdup(), PGRES_COMMAND_OK, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultStatus(), PQsetTraceFlags(), PQstatus(), PQtrace(), PQTRACE_REGRESS_MODE, PQTRACE_SUPPRESS_TIMESTAMPS, print_test_list(), res, generate_unaccent_rules::stdout, test_cancel(), test_disallowed_in_pipeline(), test_multi_pipelines(), test_nosync(), test_pipeline_abort(), test_pipeline_idle(), test_pipelined_insert(), test_prepared(), test_simple_pipeline(), test_singlerowmode(), test_transaction(), test_uniqviol(), tracefile, and usage().

◆ notice_processor()

static void notice_processor ( void *  arg,
const char *  message 
)
static

Definition at line 1411 of file libpq_pipeline.c.

1412{
1413 int *n_notices = (int *) arg;
1414
1415 (*n_notices)++;
1416 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1417}
void * arg

References arg, and fprintf().

Referenced by test_pipeline_idle().

◆ pg_attribute_noreturn()

static void pg_attribute_noreturn ( )
static

◆ pg_attribute_printf()

static void const char pg_attribute_printf ( ,
 
)

◆ print_test_list()

static void print_test_list ( void  )
static

Definition at line 2147 of file libpq_pipeline.c.

2148{
2149 printf("cancel\n");
2150 printf("disallowed_in_pipeline\n");
2151 printf("multi_pipelines\n");
2152 printf("nosync\n");
2153 printf("pipeline_abort\n");
2154 printf("pipeline_idle\n");
2155 printf("pipelined_insert\n");
2156 printf("prepared\n");
2157 printf("simple_pipeline\n");
2158 printf("singlerow\n");
2159 printf("transaction\n");
2160 printf("uniqviol\n");
2161}
#define printf(...)
Definition: port.h:245

References printf.

Referenced by main().

◆ process_result()

static bool process_result ( PGconn conn,
PGresult res,
int  results,
int  numsent 
)
static

Definition at line 2087 of file libpq_pipeline.c.

2088{
2089 PGresult *res2;
2090 bool got_error = false;
2091
2092 if (res == NULL)
2093 pg_fatal("got unexpected NULL");
2094
2095 switch (PQresultStatus(res))
2096 {
2097 case PGRES_FATAL_ERROR:
2098 got_error = true;
2099 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2100 PQclear(res);
2101
2102 res2 = PQgetResult(conn);
2103 if (res2 != NULL)
2104 pg_fatal("expected NULL, got %s",
2106 break;
2107
2108 case PGRES_TUPLES_OK:
2109 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2110 PQclear(res);
2111
2112 res2 = PQgetResult(conn);
2113 if (res2 != NULL)
2114 pg_fatal("expected NULL, got %s",
2116 break;
2117
2119 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2120 res2 = PQgetResult(conn);
2121 if (res2 != NULL)
2122 pg_fatal("expected NULL, got %s",
2124 break;
2125
2126 default:
2127 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2128 }
2129
2130 return got_error;
2131}
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * PQresStatus(ExecStatusType status)
Definition: fe-exec.c:3419
@ PGRES_PIPELINE_ABORTED
Definition: libpq-fe.h:137
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:125

References conn, fprintf(), pg_fatal, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetResult(), PQgetvalue(), PQresStatus(), PQresultStatus(), and res.

Referenced by test_uniqviol().

◆ send_cancellable_query_impl()

static void send_cancellable_query_impl ( int  line,
PGconn conn,
PGconn monitorConn 
)
static

Definition at line 173 of file libpq_pipeline.c.

174{
175 const char *env_wait;
176 const Oid paramTypes[1] = {INT4OID};
177
178 /*
179 * Wait for the connection to be idle, so that our check for an active
180 * connection below is reliable, instead of possibly seeing an outdated
181 * state.
182 */
183 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
184
185 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
186 if (env_wait == NULL)
187 env_wait = "180";
188
189 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
190 &env_wait, NULL, NULL, 0) != 1)
191 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
192
193 /*
194 * Wait for the sleep to be active, because if the query is not running
195 * yet, the cancel request that we send won't have any effect.
196 */
197 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
198}
int PQbackendPID(const PGconn *conn)
Definition: fe-connect.c:7543
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
unsigned int Oid
Definition: postgres_ext.h:32

References conn, PQbackendPID(), PQerrorMessage(), PQsendQueryParams(), and wait_for_connection_state().

◆ test_cancel()

static void test_cancel ( PGconn conn)
static

Definition at line 243 of file libpq_pipeline.c.

244{
245 PGcancel *cancel;
247 PGconn *monitorConn;
248 char errorbuf[256];
249
250 fprintf(stderr, "test cancellations... ");
251
252 if (PQsetnonblocking(conn, 1) != 0)
253 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
254
255 /*
256 * Make a separate connection to the database to monitor the query on the
257 * main connection.
258 */
259 monitorConn = copy_connection(conn);
260 Assert(PQstatus(monitorConn) == CONNECTION_OK);
261
262 /* test PQcancel */
263 send_cancellable_query(conn, monitorConn);
264 cancel = PQgetCancel(conn);
265 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
266 pg_fatal("failed to run PQcancel: %s", errorbuf);
268
269 /* PGcancel object can be reused for the next query */
270 send_cancellable_query(conn, monitorConn);
271 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
272 pg_fatal("failed to run PQcancel: %s", errorbuf);
274
275 PQfreeCancel(cancel);
276
277 /* test PQrequestCancel */
278 send_cancellable_query(conn, monitorConn);
279 if (!PQrequestCancel(conn))
280 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
282
283 /* test PQcancelBlocking */
284 send_cancellable_query(conn, monitorConn);
287 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
290
291 /* test PQcancelCreate and then polling with PQcancelPoll */
292 send_cancellable_query(conn, monitorConn);
295 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
296 while (true)
297 {
298 struct timeval tv;
299 fd_set input_mask;
300 fd_set output_mask;
302 int sock = PQcancelSocket(cancelConn);
303
304 if (pollres == PGRES_POLLING_OK)
305 break;
306
307 FD_ZERO(&input_mask);
308 FD_ZERO(&output_mask);
309 switch (pollres)
310 {
312 pg_debug("polling for reads\n");
313 FD_SET(sock, &input_mask);
314 break;
316 pg_debug("polling for writes\n");
317 FD_SET(sock, &output_mask);
318 break;
319 default:
320 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
321 }
322
323 if (sock < 0)
324 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
325
326 tv.tv_sec = 3;
327 tv.tv_usec = 0;
328
329 while (true)
330 {
331 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
332 {
333 if (errno == EINTR)
334 continue;
335 pg_fatal("select() failed: %m");
336 }
337 break;
338 }
339 }
341 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
343
344 /*
345 * test PQcancelReset works on the cancel connection and it can be reused
346 * afterwards
347 */
349
350 send_cancellable_query(conn, monitorConn);
352 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
353 while (true)
354 {
355 struct timeval tv;
356 fd_set input_mask;
357 fd_set output_mask;
359 int sock = PQcancelSocket(cancelConn);
360
361 if (pollres == PGRES_POLLING_OK)
362 break;
363
364 FD_ZERO(&input_mask);
365 FD_ZERO(&output_mask);
366 switch (pollres)
367 {
369 pg_debug("polling for reads\n");
370 FD_SET(sock, &input_mask);
371 break;
373 pg_debug("polling for writes\n");
374 FD_SET(sock, &output_mask);
375 break;
376 default:
377 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
378 }
379
380 if (sock < 0)
381 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
382
383 tv.tv_sec = 3;
384 tv.tv_usec = 0;
385
386 while (true)
387 {
388 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
389 {
390 if (errno == EINTR)
391 continue;
392 pg_fatal("select() failed: %m");
393 }
394 break;
395 }
396 }
398 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
400
402
403 fprintf(stderr, "ok\n");
404}
static PGcancel *volatile cancelConn
Definition: cancel.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:349
void PQcancelReset(PGcancelConn *cancelConn)
Definition: fe-cancel.c:318
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:283
int PQcancelBlocking(PGcancelConn *cancelConn)
Definition: fe-cancel.c:171
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:463
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:334
int PQrequestCancel(PGconn *conn)
Definition: fe-cancel.c:661
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:417
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:306
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
int PQsetnonblocking(PGconn *conn, int arg)
Definition: fe-exec.c:3944
PostgresPollingStatusType
Definition: libpq-fe.h:111
@ PGRES_POLLING_OK
Definition: libpq-fe.h:115
@ PGRES_POLLING_READING
Definition: libpq-fe.h:113
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:114
#define confirm_query_canceled(conn)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_debug(...)
#define send_cancellable_query(conn, monitorConn)
static PGconn * copy_connection(PGconn *conn)
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503

References Assert(), cancelConn, confirm_query_canceled, conn, CONNECTION_OK, copy_connection(), EINTR, fprintf(), pg_debug, pg_fatal, PGRES_POLLING_OK, PGRES_POLLING_READING, PGRES_POLLING_WRITING, PQcancel(), PQcancelBlocking(), PQcancelCreate(), PQcancelErrorMessage(), PQcancelFinish(), PQcancelPoll(), PQcancelReset(), PQcancelSocket(), PQcancelStart(), PQcancelStatus(), PQerrorMessage(), PQfreeCancel(), PQgetCancel(), PQrequestCancel(), PQsetnonblocking(), PQstatus(), select, and send_cancellable_query.

Referenced by main().

◆ test_disallowed_in_pipeline()

static void test_disallowed_in_pipeline ( PGconn conn)
static

Definition at line 407 of file libpq_pipeline.c.

408{
409 PGresult *res = NULL;
410
411 fprintf(stderr, "test error cases... ");
412
414 pg_fatal("Expected blocking connection mode");
415
416 if (PQenterPipelineMode(conn) != 1)
417 pg_fatal("Unable to enter pipeline mode");
418
420 pg_fatal("Pipeline mode not activated properly");
421
422 /* PQexec should fail in pipeline mode */
423 res = PQexec(conn, "SELECT 1");
425 pg_fatal("PQexec should fail in pipeline mode but succeeded");
426 if (strcmp(PQerrorMessage(conn),
427 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
428 pg_fatal("did not get expected error message; got: \"%s\"",
430
431 /* PQsendQuery should fail in pipeline mode */
432 if (PQsendQuery(conn, "SELECT 1") != 0)
433 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
434 if (strcmp(PQerrorMessage(conn),
435 "PQsendQuery not allowed in pipeline mode\n") != 0)
436 pg_fatal("did not get expected error message; got: \"%s\"",
438
439 /* Entering pipeline mode when already in pipeline mode is OK */
440 if (PQenterPipelineMode(conn) != 1)
441 pg_fatal("re-entering pipeline mode should be a no-op but failed");
442
443 if (PQisBusy(conn) != 0)
444 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
445
446 /* ok, back to normal command mode */
447 if (PQexitPipelineMode(conn) != 1)
448 pg_fatal("couldn't exit idle empty pipeline mode");
449
451 pg_fatal("Pipeline mode not terminated properly");
452
453 /* exiting pipeline mode when not in pipeline mode should be a no-op */
454 if (PQexitPipelineMode(conn) != 1)
455 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
456
457 /* can now PQexec again */
458 res = PQexec(conn, "SELECT 1");
460 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
462
463 fprintf(stderr, "ok\n");
464}
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
Definition: fe-connect.c:7551
int PQexitPipelineMode(PGconn *conn)
Definition: fe-exec.c:3073
int PQenterPipelineMode(PGconn *conn)
Definition: fe-exec.c:3042
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisnonblocking(const PGconn *conn)
Definition: fe-exec.c:3983
@ PQ_PIPELINE_OFF
Definition: libpq-fe.h:184

References conn, fprintf(), pg_fatal, PGRES_FATAL_ERROR, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQisBusy(), PQisnonblocking(), PQpipelineStatus(), PQresultStatus(), PQsendQuery(), and res.

Referenced by main().

◆ test_multi_pipelines()

static void test_multi_pipelines ( PGconn conn)
static

Definition at line 467 of file libpq_pipeline.c.

468{
469 PGresult *res = NULL;
470 const char *dummy_params[1] = {"1"};
471 Oid dummy_param_oids[1] = {INT4OID};
472
473 fprintf(stderr, "multi pipeline... ");
474
475 /*
476 * Queue up a couple of small pipelines and process each without returning
477 * to command mode first.
478 */
479 if (PQenterPipelineMode(conn) != 1)
480 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
481
482 /* first pipeline */
483 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
484 dummy_params, NULL, NULL, 0) != 1)
485 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
486
487 if (PQpipelineSync(conn) != 1)
488 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
489
490 /* second pipeline */
491 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
492 dummy_params, NULL, NULL, 0) != 1)
493 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
494
495 /* Skip flushing once. */
496 if (PQsendPipelineSync(conn) != 1)
497 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
498
499 /* third pipeline */
500 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
501 dummy_params, NULL, NULL, 0) != 1)
502 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
503
504 if (PQpipelineSync(conn) != 1)
505 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
506
507 /* OK, start processing the results */
508
509 /* first pipeline */
510
512 if (res == NULL)
513 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
515
517 pg_fatal("Unexpected result code %s from first pipeline item",
519 PQclear(res);
520 res = NULL;
521
522 if (PQgetResult(conn) != NULL)
523 pg_fatal("PQgetResult returned something extra after first result");
524
525 if (PQexitPipelineMode(conn) != 0)
526 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
527
529 if (res == NULL)
530 pg_fatal("PQgetResult returned null when sync result expected: %s",
532
534 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
536 PQclear(res);
537
538 /* second pipeline */
539
541 if (res == NULL)
542 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
544
546 pg_fatal("Unexpected result code %s from second pipeline item",
548 PQclear(res);
549 res = NULL;
550
551 if (PQgetResult(conn) != NULL)
552 pg_fatal("PQgetResult returned something extra after first result");
553
554 if (PQexitPipelineMode(conn) != 0)
555 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
556
558 if (res == NULL)
559 pg_fatal("PQgetResult returned null when sync result expected: %s",
561
563 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
565 PQclear(res);
566
567 /* third pipeline */
568
570 if (res == NULL)
571 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
573
575 pg_fatal("Unexpected result code %s from third pipeline item",
577
579 if (res != NULL)
580 pg_fatal("Expected null result, got %s",
582
584 if (res == NULL)
585 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
587
589 pg_fatal("Unexpected result code %s from second pipeline sync",
591
592 /* We're still in pipeline mode ... */
594 pg_fatal("Fell out of pipeline mode somehow");
595
596 /* until we end it, which we can safely do now */
597 if (PQexitPipelineMode(conn) != 1)
598 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
600
602 pg_fatal("exiting pipeline mode didn't seem to work");
603
604 fprintf(stderr, "ok\n");
605}
int PQsendPipelineSync(PGconn *conn)
Definition: fe-exec.c:3282
int PQpipelineSync(PGconn *conn)
Definition: fe-exec.c:3272
@ PGRES_PIPELINE_SYNC
Definition: libpq-fe.h:136

References conn, fprintf(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPipelineSync(), PQsendQueryParams(), and res.

Referenced by main().

◆ test_nosync()

static void test_nosync ( PGconn conn)
static

Definition at line 612 of file libpq_pipeline.c.

613{
614 int numqueries = 10;
615 int results = 0;
616 int sock = PQsocket(conn);
617
618 fprintf(stderr, "nosync... ");
619
620 if (sock < 0)
621 pg_fatal("invalid socket");
622
623 if (PQenterPipelineMode(conn) != 1)
624 pg_fatal("could not enter pipeline mode");
625 for (int i = 0; i < numqueries; i++)
626 {
627 fd_set input_mask;
628 struct timeval tv;
629
630 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
631 0, NULL, NULL, NULL, NULL, 0) != 1)
632 pg_fatal("error sending select: %s", PQerrorMessage(conn));
633 PQflush(conn);
634
635 /*
636 * If the server has written anything to us, read (some of) it now.
637 */
638 FD_ZERO(&input_mask);
639 FD_SET(sock, &input_mask);
640 tv.tv_sec = 0;
641 tv.tv_usec = 0;
642 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
643 {
644 fprintf(stderr, "select() failed: %m\n");
646 }
647 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
648 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
649 }
650
651 /* tell server to flush its output buffer */
652 if (PQsendFlushRequest(conn) != 1)
653 pg_fatal("failed to send flush request");
654 PQflush(conn);
655
656 /* Now read all results */
657 for (;;)
658 {
659 PGresult *res;
660
662
663 /* NULL results are only expected after TUPLES_OK */
664 if (res == NULL)
665 pg_fatal("got unexpected NULL result after %d results", results);
666
667 /* We expect exactly one TUPLES_OK result for each query we sent */
669 {
670 PGresult *res2;
671
672 /* and one NULL result should follow each */
673 res2 = PQgetResult(conn);
674 if (res2 != NULL)
675 pg_fatal("expected NULL, got %s",
677 PQclear(res);
678 results++;
679
680 /* if we're done, we're done */
681 if (results == numqueries)
682 break;
683
684 continue;
685 }
686
687 /* anything else is unexpected */
688 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
689 }
690
691 fprintf(stderr, "ok\n");
692}
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7533
int PQflush(PGconn *conn)
Definition: fe-exec.c:4000
int PQsendFlushRequest(PGconn *conn)
Definition: fe-exec.c:3371

References conn, exit_nicely(), fprintf(), i, pg_fatal, PGRES_TUPLES_OK, PQclear(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQflush(), PQgetResult(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsocket(), res, and select.

Referenced by main().

◆ test_pipeline_abort()

static void test_pipeline_abort ( PGconn conn)
static

Definition at line 704 of file libpq_pipeline.c.

705{
706 PGresult *res = NULL;
707 const char *dummy_params[1] = {"1"};
708 Oid dummy_param_oids[1] = {INT4OID};
709 int i;
710 int gotrows;
711 bool goterror;
712
713 fprintf(stderr, "aborted pipeline... ");
714
717 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
718
721 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
722
723 /*
724 * Queue up a couple of small pipelines and process each without returning
725 * to command mode first. Make sure the second operation in the first
726 * pipeline ERRORs.
727 */
728 if (PQenterPipelineMode(conn) != 1)
729 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
730
731 dummy_params[0] = "1";
732 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
733 dummy_params, NULL, NULL, 0) != 1)
734 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
735
736 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
737 1, dummy_param_oids, dummy_params,
738 NULL, NULL, 0) != 1)
739 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
740
741 dummy_params[0] = "2";
742 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
743 dummy_params, NULL, NULL, 0) != 1)
744 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
745
746 if (PQpipelineSync(conn) != 1)
747 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
748
749 dummy_params[0] = "3";
750 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
751 dummy_params, NULL, NULL, 0) != 1)
752 pg_fatal("dispatching second-pipeline insert failed: %s",
754
755 if (PQpipelineSync(conn) != 1)
756 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
757
758 /*
759 * OK, start processing the pipeline results.
760 *
761 * We should get a command-ok for the first query, then a fatal error and
762 * a pipeline aborted message for the second insert, a pipeline-end, then
763 * a command-ok and a pipeline-ok for the second pipeline operation.
764 */
766 if (res == NULL)
767 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
769 pg_fatal("Unexpected result status %s: %s",
772 PQclear(res);
773
774 /* NULL result to signal end-of-results for this command */
775 if ((res = PQgetResult(conn)) != NULL)
776 pg_fatal("Expected null result, got %s",
778
779 /* Second query caused error, so we expect an error next */
781 if (res == NULL)
782 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
784 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
786 PQclear(res);
787
788 /* NULL result to signal end-of-results for this command */
789 if ((res = PQgetResult(conn)) != NULL)
790 pg_fatal("Expected null result, got %s",
792
793 /*
794 * pipeline should now be aborted.
795 *
796 * Note that we could still queue more queries at this point if we wanted;
797 * they'd get added to a new third pipeline since we've already sent a
798 * second. The aborted flag relates only to the pipeline being received.
799 */
801 pg_fatal("pipeline should be flagged as aborted but isn't");
802
803 /* third query in pipeline, the second insert */
805 if (res == NULL)
806 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
808 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
810 PQclear(res);
811
812 /* NULL result to signal end-of-results for this command */
813 if ((res = PQgetResult(conn)) != NULL)
814 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
815
817 pg_fatal("pipeline should be flagged as aborted but isn't");
818
819 /* Ensure we're still in pipeline */
821 pg_fatal("Fell out of pipeline mode somehow");
822
823 /*
824 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
825 *
826 * (This is so clients know to start processing results normally again and
827 * can tell the difference between skipped commands and the sync.)
828 */
830 if (res == NULL)
831 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
833 pg_fatal("Unexpected result code from first pipeline sync\n"
834 "Expected PGRES_PIPELINE_SYNC, got %s",
836 PQclear(res);
837
839 pg_fatal("sync should've cleared the aborted flag but didn't");
840
841 /* We're still in pipeline mode... */
843 pg_fatal("Fell out of pipeline mode somehow");
844
845 /* the insert from the second pipeline */
847 if (res == NULL)
848 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
850 pg_fatal("Unexpected result code %s from first item in second pipeline",
852 PQclear(res);
853
854 /* Read the NULL result at the end of the command */
855 if ((res = PQgetResult(conn)) != NULL)
856 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
857
858 /* the second pipeline sync */
859 if ((res = PQgetResult(conn)) == NULL)
860 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
862 pg_fatal("Unexpected result code %s from second pipeline sync",
864 PQclear(res);
865
866 if ((res = PQgetResult(conn)) != NULL)
867 pg_fatal("Expected null result, got %s: %s",
870
871 /* Try to send two queries in one command */
872 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
873 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
874 if (PQpipelineSync(conn) != 1)
875 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
876 goterror = false;
877 while ((res = PQgetResult(conn)) != NULL)
878 {
879 switch (PQresultStatus(res))
880 {
882 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
883 pg_fatal("expected error about multiple commands, got %s",
885 printf("got expected %s", PQerrorMessage(conn));
886 goterror = true;
887 break;
888 default:
889 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
890 break;
891 }
892 }
893 if (!goterror)
894 pg_fatal("did not get cannot-insert-multiple-commands error");
896 if (res == NULL)
897 pg_fatal("got NULL result");
899 pg_fatal("Unexpected result code %s from pipeline sync",
901 fprintf(stderr, "ok\n");
902
903 /* Test single-row mode with an error partways */
904 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
905 0, NULL, NULL, NULL, NULL, 0) != 1)
906 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
907 if (PQpipelineSync(conn) != 1)
908 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
910 goterror = false;
911 gotrows = 0;
912 while ((res = PQgetResult(conn)) != NULL)
913 {
914 switch (PQresultStatus(res))
915 {
917 printf("got row: %s\n", PQgetvalue(res, 0, 0));
918 gotrows++;
919 break;
921 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
922 pg_fatal("expected division-by-zero, got: %s (%s)",
925 printf("got expected division-by-zero\n");
926 goterror = true;
927 break;
928 default:
929 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
930 }
931 PQclear(res);
932 }
933 if (!goterror)
934 pg_fatal("did not get division-by-zero error");
935 if (gotrows != 3)
936 pg_fatal("did not get three rows");
937 /* the third pipeline sync */
938 if ((res = PQgetResult(conn)) == NULL)
939 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
941 pg_fatal("Unexpected result code %s from third pipeline sync",
943 PQclear(res);
944
945 /* We're still in pipeline mode... */
947 pg_fatal("Fell out of pipeline mode somehow");
948
949 /* until we end it, which we can safely do now */
950 if (PQexitPipelineMode(conn) != 1)
951 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
953
955 pg_fatal("exiting pipeline mode didn't seem to work");
956
957 /*-
958 * Since we fired the pipelines off without a surrounding xact, the results
959 * should be:
960 *
961 * - Implicit xact started by server around 1st pipeline
962 * - First insert applied
963 * - Second statement aborted xact
964 * - Third insert skipped
965 * - Sync rolled back first implicit xact
966 * - Implicit xact created by server around 2nd pipeline
967 * - insert applied from 2nd pipeline
968 * - Sync commits 2nd xact
969 *
970 * So we should only have the value 3 that we inserted.
971 */
972 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
973
975 pg_fatal("Expected tuples, got %s: %s",
977 if (PQntuples(res) != 1)
978 pg_fatal("expected 1 result, got %d", PQntuples(res));
979 for (i = 0; i < PQntuples(res); i++)
980 {
981 const char *val = PQgetvalue(res, i, 0);
982
983 if (strcmp(val, "3") != 0)
984 pg_fatal("expected only insert with value 3, got %s", val);
985 }
986
987 PQclear(res);
988
989 fprintf(stderr, "ok\n");
990}
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
long val
Definition: informix.c:689
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:135
@ PQ_PIPELINE_ABORTED
Definition: libpq-fe.h:186
static const char *const create_table_sql
static const char *const insert_sql
static const char *const drop_table_sql

References conn, create_table_sql, drop_table_sql, fprintf(), i, insert_sql, PG_DIAG_SQLSTATE, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQ_PIPELINE_ABORTED, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultErrorField(), PQresultErrorMessage(), PQresultStatus(), PQsendQueryParams(), PQsetSingleRowMode(), printf, res, and val.

Referenced by main().

◆ test_pipeline_idle()

static void test_pipeline_idle ( PGconn conn)
static

Definition at line 1421 of file libpq_pipeline.c.

1422{
1423 PGresult *res;
1424 int n_notices = 0;
1425
1426 fprintf(stderr, "\npipeline idle...\n");
1427
1429
1430 /* Try to exit pipeline mode in pipeline-idle state */
1431 if (PQenterPipelineMode(conn) != 1)
1432 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1433 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1434 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1436 res = PQgetResult(conn);
1437 if (res == NULL)
1438 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441 pg_fatal("unexpected result code %s from first pipeline item",
1443 PQclear(res);
1444 res = PQgetResult(conn);
1445 if (res != NULL)
1446 pg_fatal("did not receive terminating NULL");
1447 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1448 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1449 if (PQexitPipelineMode(conn) == 1)
1450 pg_fatal("exiting pipeline succeeded when it shouldn't");
1451 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1452 strlen("cannot exit pipeline mode")) != 0)
1453 pg_fatal("did not get expected error; got: %s",
1456 res = PQgetResult(conn);
1458 pg_fatal("unexpected result code %s from second pipeline item",
1460 PQclear(res);
1461 res = PQgetResult(conn);
1462 if (res != NULL)
1463 pg_fatal("did not receive terminating NULL");
1464 if (PQexitPipelineMode(conn) != 1)
1465 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1466
1467 if (n_notices > 0)
1468 pg_fatal("got %d notice(s)", n_notices);
1469 fprintf(stderr, "ok - 1\n");
1470
1471 /* Have a WARNING in the middle of a resultset */
1472 if (PQenterPipelineMode(conn) != 1)
1473 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1474 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1475 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1477 res = PQgetResult(conn);
1478 if (res == NULL)
1479 pg_fatal("unexpected NULL result received");
1481 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1482 if (PQexitPipelineMode(conn) != 1)
1483 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1484 fprintf(stderr, "ok - 2\n");
1485}
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
Definition: fe-connect.c:7688
static void notice_processor(void *arg, const char *message)

References conn, fprintf(), notice_processor(), pg_fatal, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsetNoticeProcessor(), and res.

Referenced by main().

◆ test_pipelined_insert()

static void test_pipelined_insert ( PGconn conn,
int  n_rows 
)
static

Definition at line 1006 of file libpq_pipeline.c.

1007{
1008 Oid insert_param_oids[2] = {INT4OID, INT8OID};
1009 const char *insert_params[2];
1010 char insert_param_0[MAXINTLEN];
1011 char insert_param_1[MAXINT8LEN];
1012 enum PipelineInsertStep send_step = BI_BEGIN_TX,
1013 recv_step = BI_BEGIN_TX;
1014 int rows_to_send,
1015 rows_to_receive;
1016
1017 insert_params[0] = insert_param_0;
1018 insert_params[1] = insert_param_1;
1019
1020 rows_to_send = rows_to_receive = n_rows;
1021
1022 /*
1023 * Do a pipelined insert into a table created at the start of the pipeline
1024 */
1025 if (PQenterPipelineMode(conn) != 1)
1026 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1027
1028 while (send_step != BI_PREPARE)
1029 {
1030 const char *sql;
1031
1032 switch (send_step)
1033 {
1034 case BI_BEGIN_TX:
1035 sql = "BEGIN TRANSACTION";
1036 send_step = BI_DROP_TABLE;
1037 break;
1038
1039 case BI_DROP_TABLE:
1040 sql = drop_table_sql;
1041 send_step = BI_CREATE_TABLE;
1042 break;
1043
1044 case BI_CREATE_TABLE:
1045 sql = create_table_sql;
1046 send_step = BI_PREPARE;
1047 break;
1048
1049 default:
1050 pg_fatal("invalid state");
1051 sql = NULL; /* keep compiler quiet */
1052 }
1053
1054 pg_debug("sending: %s\n", sql);
1055 if (PQsendQueryParams(conn, sql,
1056 0, NULL, NULL, NULL, NULL, 0) != 1)
1057 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1058 }
1059
1060 Assert(send_step == BI_PREPARE);
1061 pg_debug("sending: %s\n", insert_sql2);
1062 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1063 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1064 send_step = BI_INSERT_ROWS;
1065
1066 /*
1067 * Now we start inserting. We'll be sending enough data that we could fill
1068 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1069 * mode and consume input while we send more output. As results of each
1070 * query are processed we should pop them to allow processing of the next
1071 * query. There's no need to finish the pipeline before processing
1072 * results.
1073 */
1074 if (PQsetnonblocking(conn, 1) != 0)
1075 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1076
1077 while (recv_step != BI_DONE)
1078 {
1079 int sock;
1080 fd_set input_mask;
1081 fd_set output_mask;
1082
1083 sock = PQsocket(conn);
1084
1085 if (sock < 0)
1086 break; /* shouldn't happen */
1087
1088 FD_ZERO(&input_mask);
1089 FD_SET(sock, &input_mask);
1090 FD_ZERO(&output_mask);
1091 FD_SET(sock, &output_mask);
1092
1093 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1094 {
1095 fprintf(stderr, "select() failed: %m\n");
1097 }
1098
1099 /*
1100 * Process any results, so we keep the server's output buffer free
1101 * flowing and it can continue to process input
1102 */
1103 if (FD_ISSET(sock, &input_mask))
1104 {
1106
1107 /* Read until we'd block if we tried to read */
1108 while (!PQisBusy(conn) && recv_step < BI_DONE)
1109 {
1110 PGresult *res;
1111 const char *cmdtag = "";
1112 const char *description = "";
1113 int status;
1114
1115 /*
1116 * Read next result. If no more results from this query,
1117 * advance to the next query
1118 */
1119 res = PQgetResult(conn);
1120 if (res == NULL)
1121 continue;
1122
1123 status = PGRES_COMMAND_OK;
1124 switch (recv_step)
1125 {
1126 case BI_BEGIN_TX:
1127 cmdtag = "BEGIN";
1128 recv_step++;
1129 break;
1130 case BI_DROP_TABLE:
1131 cmdtag = "DROP TABLE";
1132 recv_step++;
1133 break;
1134 case BI_CREATE_TABLE:
1135 cmdtag = "CREATE TABLE";
1136 recv_step++;
1137 break;
1138 case BI_PREPARE:
1139 cmdtag = "";
1140 description = "PREPARE";
1141 recv_step++;
1142 break;
1143 case BI_INSERT_ROWS:
1144 cmdtag = "INSERT";
1145 rows_to_receive--;
1146 if (rows_to_receive == 0)
1147 recv_step++;
1148 break;
1149 case BI_COMMIT_TX:
1150 cmdtag = "COMMIT";
1151 recv_step++;
1152 break;
1153 case BI_SYNC:
1154 cmdtag = "";
1155 description = "SYNC";
1156 status = PGRES_PIPELINE_SYNC;
1157 recv_step++;
1158 break;
1159 case BI_DONE:
1160 /* unreachable */
1161 pg_fatal("unreachable state");
1162 }
1163
1164 if (PQresultStatus(res) != status)
1165 pg_fatal("%s reported status %s, expected %s\n"
1166 "Error message: \"%s\"",
1168 PQresStatus(status), PQerrorMessage(conn));
1169
1170 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1171 pg_fatal("%s expected command tag '%s', got '%s'",
1172 description, cmdtag, PQcmdStatus(res));
1173
1174 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1175
1176 PQclear(res);
1177 }
1178 }
1179
1180 /* Write more rows and/or the end pipeline message, if needed */
1181 if (FD_ISSET(sock, &output_mask))
1182 {
1183 PQflush(conn);
1184
1185 if (send_step == BI_INSERT_ROWS)
1186 {
1187 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1188 /* use up some buffer space with a wide value */
1189 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1190
1191 if (PQsendQueryPrepared(conn, "my_insert",
1192 2, insert_params, NULL, NULL, 0) == 1)
1193 {
1194 pg_debug("sent row %d\n", rows_to_send);
1195
1196 rows_to_send--;
1197 if (rows_to_send == 0)
1198 send_step++;
1199 }
1200 else
1201 {
1202 /*
1203 * in nonblocking mode, so it's OK for an insert to fail
1204 * to send
1205 */
1206 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1207 rows_to_send, PQerrorMessage(conn));
1208 }
1209 }
1210 else if (send_step == BI_COMMIT_TX)
1211 {
1212 if (PQsendQueryParams(conn, "COMMIT",
1213 0, NULL, NULL, NULL, NULL, 0) == 1)
1214 {
1215 pg_debug("sent COMMIT\n");
1216 send_step++;
1217 }
1218 else
1219 {
1220 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1222 }
1223 }
1224 else if (send_step == BI_SYNC)
1225 {
1226 if (PQpipelineSync(conn) == 1)
1227 {
1228 fprintf(stdout, "pipeline sync sent\n");
1229 send_step++;
1230 }
1231 else
1232 {
1233 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1235 }
1236 }
1237 }
1238 }
1239
1240 /* We've got the sync message and the pipeline should be done */
1241 if (PQexitPipelineMode(conn) != 1)
1242 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1244
1245 if (PQsetnonblocking(conn, 0) != 0)
1246 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1247
1248 fprintf(stderr, "ok\n");
1249}
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1536
char * PQcmdStatus(PGresult *res)
Definition: fe-exec.c:3752
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
#define MAXINT8LEN
static const char *const insert_sql2
#define MAXINTLEN
PipelineInsertStep
#define snprintf
Definition: port.h:239
const char * description

References Assert(), BI_BEGIN_TX, BI_COMMIT_TX, BI_CREATE_TABLE, BI_DONE, BI_DROP_TABLE, BI_INSERT_ROWS, BI_PREPARE, BI_SYNC, conn, create_table_sql, description, drop_table_sql, exit_nicely(), fprintf(), insert_sql2, MAXINT8LEN, MAXINTLEN, pg_debug, pg_fatal, PGRES_COMMAND_OK, PGRES_PIPELINE_SYNC, PQclear(), PQcmdStatus(), PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQflush(), PQgetResult(), PQisBusy(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), res, select, snprintf, and generate_unaccent_rules::stdout.

Referenced by main().

◆ test_prepared()

static void test_prepared ( PGconn conn)
static

Definition at line 1252 of file libpq_pipeline.c.

1253{
1254 PGresult *res = NULL;
1255 Oid param_oids[1] = {INT4OID};
1256 Oid expected_oids[4];
1257 Oid typ;
1258
1259 fprintf(stderr, "prepared... ");
1260
1261 if (PQenterPipelineMode(conn) != 1)
1262 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1263 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1264 "interval '1 sec'",
1265 1, param_oids) != 1)
1266 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1267 expected_oids[0] = INT4OID;
1268 expected_oids[1] = TEXTOID;
1269 expected_oids[2] = NUMERICOID;
1270 expected_oids[3] = INTERVALOID;
1271 if (PQsendDescribePrepared(conn, "select_one") != 1)
1272 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1273 if (PQpipelineSync(conn) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1275
1276 res = PQgetResult(conn);
1277 if (res == NULL)
1278 pg_fatal("PQgetResult returned null");
1280 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1281 PQclear(res);
1282 res = PQgetResult(conn);
1283 if (res != NULL)
1284 pg_fatal("expected NULL result");
1285
1286 res = PQgetResult(conn);
1287 if (res == NULL)
1288 pg_fatal("PQgetResult returned NULL");
1290 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1291 if (PQnfields(res) != lengthof(expected_oids))
1292 pg_fatal("expected %zu columns, got %d",
1293 lengthof(expected_oids), PQnfields(res));
1294 for (int i = 0; i < PQnfields(res); i++)
1295 {
1296 typ = PQftype(res, i);
1297 if (typ != expected_oids[i])
1298 pg_fatal("field %d: expected type %u, got %u",
1299 i, expected_oids[i], typ);
1300 }
1301 PQclear(res);
1302 res = PQgetResult(conn);
1303 if (res != NULL)
1304 pg_fatal("expected NULL result");
1305
1306 res = PQgetResult(conn);
1308 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1309
1310 fprintf(stderr, "closing statement..");
1311 if (PQsendClosePrepared(conn, "select_one") != 1)
1312 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1313 if (PQpipelineSync(conn) != 1)
1314 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1315
1316 res = PQgetResult(conn);
1317 if (res == NULL)
1318 pg_fatal("expected non-NULL result");
1320 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1321 PQclear(res);
1322 res = PQgetResult(conn);
1323 if (res != NULL)
1324 pg_fatal("expected NULL result");
1325 res = PQgetResult(conn);
1327 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1328
1329 if (PQexitPipelineMode(conn) != 1)
1330 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1331
1332 /* Now that it's closed we should get an error when describing */
1333 res = PQdescribePrepared(conn, "select_one");
1335 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1336
1337 /*
1338 * Also test the blocking close, this should not fail since closing a
1339 * non-existent prepared statement is a no-op
1340 */
1341 res = PQclosePrepared(conn, "select_one");
1343 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1344
1345 fprintf(stderr, "creating portal... ");
1346 PQexec(conn, "BEGIN");
1347 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1349 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1350 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1351 if (PQpipelineSync(conn) != 1)
1352 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353 res = PQgetResult(conn);
1354 if (res == NULL)
1355 pg_fatal("PQgetResult returned null");
1357 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1358
1359 typ = PQftype(res, 0);
1360 if (typ != INT4OID)
1361 pg_fatal("portal: expected type %u, got %u",
1362 INT4OID, typ);
1363 PQclear(res);
1364 res = PQgetResult(conn);
1365 if (res != NULL)
1366 pg_fatal("expected NULL result");
1367 res = PQgetResult(conn);
1369 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1370
1371 fprintf(stderr, "closing portal... ");
1372 if (PQsendClosePortal(conn, "cursor_one") != 1)
1373 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1374 if (PQpipelineSync(conn) != 1)
1375 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1376
1377 res = PQgetResult(conn);
1378 if (res == NULL)
1379 pg_fatal("expected non-NULL result");
1381 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1382 PQclear(res);
1383 res = PQgetResult(conn);
1384 if (res != NULL)
1385 pg_fatal("expected NULL result");
1386 res = PQgetResult(conn);
1388 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1389
1390 if (PQexitPipelineMode(conn) != 1)
1391 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1392
1393 /* Now that it's closed we should get an error when describing */
1394 res = PQdescribePortal(conn, "cursor_one");
1396 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1397
1398 /*
1399 * Also test the blocking close, this should not fail since closing a
1400 * non-existent portal is a no-op
1401 */
1402 res = PQclosePortal(conn, "cursor_one");
1404 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1405
1406 fprintf(stderr, "ok\n");
1407}
#define lengthof(array)
Definition: c.h:745
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2455
int PQsendClosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2569
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2521
PGresult * PQclosePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2539
int PQsendClosePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2556
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
Definition: fe-exec.c:2491
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2474
int PQsendDescribePortal(PGconn *conn, const char *portal)
Definition: fe-exec.c:2504
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489

References conn, fprintf(), i, lengthof, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_SYNC, PQclear(), PQclosePortal(), PQclosePrepared(), PQdescribePortal(), PQdescribePrepared(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQftype(), PQgetResult(), PQnfields(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendClosePortal(), PQsendClosePrepared(), PQsendDescribePortal(), PQsendDescribePrepared(), PQsendPrepare(), and res.

Referenced by main().

◆ test_simple_pipeline()

static void test_simple_pipeline ( PGconn conn)
static

Definition at line 1488 of file libpq_pipeline.c.

1489{
1490 PGresult *res = NULL;
1491 const char *dummy_params[1] = {"1"};
1492 Oid dummy_param_oids[1] = {INT4OID};
1493
1494 fprintf(stderr, "simple pipeline... ");
1495
1496 /*
1497 * Enter pipeline mode and dispatch a set of operations, which we'll then
1498 * process the results of as they come in.
1499 *
1500 * For a simple case we should be able to do this without interim
1501 * processing of results since our output buffer will give us enough slush
1502 * to work with and we won't block on sending. So blocking mode is fine.
1503 */
1504 if (PQisnonblocking(conn))
1505 pg_fatal("Expected blocking connection mode");
1506
1507 if (PQenterPipelineMode(conn) != 1)
1508 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1509
1510 if (PQsendQueryParams(conn, "SELECT $1",
1511 1, dummy_param_oids, dummy_params,
1512 NULL, NULL, 0) != 1)
1513 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1514
1515 if (PQexitPipelineMode(conn) != 0)
1516 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1517
1518 if (PQpipelineSync(conn) != 1)
1519 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1520
1521 res = PQgetResult(conn);
1522 if (res == NULL)
1523 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1525
1527 pg_fatal("Unexpected result code %s from first pipeline item",
1529
1530 PQclear(res);
1531 res = NULL;
1532
1533 if (PQgetResult(conn) != NULL)
1534 pg_fatal("PQgetResult returned something extra after first query result.");
1535
1536 /*
1537 * Even though we've processed the result there's still a sync to come and
1538 * we can't exit pipeline mode yet
1539 */
1540 if (PQexitPipelineMode(conn) != 0)
1541 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1542
1543 res = PQgetResult(conn);
1544 if (res == NULL)
1545 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1547
1549 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1551
1552 PQclear(res);
1553 res = NULL;
1554
1555 if (PQgetResult(conn) != NULL)
1556 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1558
1559 /* We're still in pipeline mode... */
1561 pg_fatal("Fell out of pipeline mode somehow");
1562
1563 /* ... until we end it, which we can safely do now */
1564 if (PQexitPipelineMode(conn) != 1)
1565 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1567
1569 pg_fatal("Exiting pipeline mode didn't seem to work");
1570
1571 fprintf(stderr, "ok\n");
1572}

References conn, fprintf(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQ_PIPELINE_OFF, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQisnonblocking(), PQpipelineStatus(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendQueryParams(), and res.

Referenced by main().

◆ test_singlerowmode()

static void test_singlerowmode ( PGconn conn)
static

Definition at line 1575 of file libpq_pipeline.c.

1576{
1577 PGresult *res;
1578 int i;
1579 bool pipeline_ended = false;
1580
1581 if (PQenterPipelineMode(conn) != 1)
1582 pg_fatal("failed to enter pipeline mode: %s",
1584
1585 /* One series of three commands, using single-row mode for the first two. */
1586 for (i = 0; i < 3; i++)
1587 {
1588 char *param[1];
1589
1590 param[0] = psprintf("%d", 44 + i);
1591
1593 "SELECT generate_series(42, $1)",
1594 1,
1595 NULL,
1596 (const char **) param,
1597 NULL,
1598 NULL,
1599 0) != 1)
1600 pg_fatal("failed to send query: %s",
1602 pfree(param[0]);
1603 }
1604 if (PQpipelineSync(conn) != 1)
1605 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1606
1607 for (i = 0; !pipeline_ended; i++)
1608 {
1609 bool first = true;
1610 bool saw_ending_tuplesok;
1611 bool isSingleTuple = false;
1612
1613 /* Set single row mode for only first 2 SELECT queries */
1614 if (i < 2)
1615 {
1616 if (PQsetSingleRowMode(conn) != 1)
1617 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1618 }
1619
1620 /* Consume rows for this query */
1621 saw_ending_tuplesok = false;
1622 while ((res = PQgetResult(conn)) != NULL)
1623 {
1625
1626 if (est == PGRES_PIPELINE_SYNC)
1627 {
1628 fprintf(stderr, "end of pipeline reached\n");
1629 pipeline_ended = true;
1630 PQclear(res);
1631 if (i != 3)
1632 pg_fatal("Expected three results, got %d", i);
1633 break;
1634 }
1635
1636 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1637 if (first)
1638 {
1639 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1640 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1641 i, PQresStatus(est));
1642 if (i >= 2 && est != PGRES_TUPLES_OK)
1643 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1644 i, PQresStatus(est));
1645 first = false;
1646 }
1647
1648 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1649 switch (est)
1650 {
1651 case PGRES_TUPLES_OK:
1652 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1653 saw_ending_tuplesok = true;
1654 if (isSingleTuple)
1655 {
1656 if (PQntuples(res) == 0)
1657 fprintf(stderr, "all tuples received in query %d\n", i);
1658 else
1659 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1660 }
1661 break;
1662
1663 case PGRES_SINGLE_TUPLE:
1664 isSingleTuple = true;
1665 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1666 break;
1667
1668 default:
1669 pg_fatal("unexpected");
1670 }
1671 PQclear(res);
1672 }
1673 if (!pipeline_ended && !saw_ending_tuplesok)
1674 pg_fatal("didn't get expected terminating TUPLES_OK");
1675 }
1676
1677 /*
1678 * Now issue one command, get its results in with single-row mode, then
1679 * issue another command, and get its results in normal mode; make sure
1680 * the single-row mode flag is reset as expected.
1681 */
1682 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1683 0, NULL, NULL, NULL, NULL, 0) != 1)
1684 pg_fatal("failed to send query: %s",
1686 if (PQsendFlushRequest(conn) != 1)
1687 pg_fatal("failed to send flush request");
1688 if (PQsetSingleRowMode(conn) != 1)
1689 pg_fatal("PQsetSingleRowMode() failed");
1690 res = PQgetResult(conn);
1691 if (res == NULL)
1692 pg_fatal("unexpected NULL");
1694 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1696 res = PQgetResult(conn);
1697 if (res == NULL)
1698 pg_fatal("unexpected NULL");
1700 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1702 if (PQgetResult(conn) != NULL)
1703 pg_fatal("expected NULL result");
1704
1705 if (PQsendQueryParams(conn, "SELECT 1",
1706 0, NULL, NULL, NULL, NULL, 0) != 1)
1707 pg_fatal("failed to send query: %s",
1709 if (PQsendFlushRequest(conn) != 1)
1710 pg_fatal("failed to send flush request");
1711 res = PQgetResult(conn);
1712 if (res == NULL)
1713 pg_fatal("unexpected NULL");
1715 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1717 if (PQgetResult(conn) != NULL)
1718 pg_fatal("expected NULL result");
1719
1720 /*
1721 * Try chunked mode as well; make sure that it correctly delivers a
1722 * partial final chunk.
1723 */
1724 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1725 0, NULL, NULL, NULL, NULL, 0) != 1)
1726 pg_fatal("failed to send query: %s",
1728 if (PQsendFlushRequest(conn) != 1)
1729 pg_fatal("failed to send flush request");
1730 if (PQsetChunkedRowsMode(conn, 3) != 1)
1731 pg_fatal("PQsetChunkedRowsMode() failed");
1732 res = PQgetResult(conn);
1733 if (res == NULL)
1734 pg_fatal("unexpected NULL");
1736 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1739 if (PQntuples(res) != 3)
1740 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1741 res = PQgetResult(conn);
1742 if (res == NULL)
1743 pg_fatal("unexpected NULL");
1745 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1747 if (PQntuples(res) != 2)
1748 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1749 res = PQgetResult(conn);
1750 if (res == NULL)
1751 pg_fatal("unexpected NULL");
1753 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1755 if (PQntuples(res) != 0)
1756 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1757 if (PQgetResult(conn) != NULL)
1758 pg_fatal("expected NULL result");
1759
1760 if (PQexitPipelineMode(conn) != 1)
1761 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1762
1763 fprintf(stderr, "ok\n");
1764}
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
Definition: fe-exec.c:1965
ExecStatusType
Definition: libpq-fe.h:120
@ PGRES_TUPLES_CHUNK
Definition: libpq-fe.h:139
void pfree(void *pointer)
Definition: mcxt.c:1521
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References conn, fprintf(), i, pfree(), pg_fatal, PGRES_PIPELINE_SYNC, PGRES_SINGLE_TUPLE, PGRES_TUPLES_CHUNK, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryParams(), PQsetChunkedRowsMode(), PQsetSingleRowMode(), psprintf(), and res.

Referenced by main().

◆ test_transaction()

static void test_transaction ( PGconn conn)
static

Definition at line 1771 of file libpq_pipeline.c.

1772{
1773 PGresult *res;
1774 bool expect_null;
1775 int num_syncs = 0;
1776
1777 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1778 "CREATE TABLE pq_pipeline_tst (id int)");
1780 pg_fatal("failed to create test table: %s",
1782 PQclear(res);
1783
1784 if (PQenterPipelineMode(conn) != 1)
1785 pg_fatal("failed to enter pipeline mode: %s",
1787 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1788 pg_fatal("could not send prepare on pipeline: %s",
1790
1792 "BEGIN",
1793 0, NULL, NULL, NULL, NULL, 0) != 1)
1794 pg_fatal("failed to send query: %s",
1797 "SELECT 0/0",
1798 0, NULL, NULL, NULL, NULL, 0) != 1)
1799 pg_fatal("failed to send query: %s",
1801
1802 /*
1803 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1804 * get out of the pipeline-aborted state first.
1805 */
1806 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1807 pg_fatal("failed to execute prepared: %s",
1809
1810 /* This insert fails because we're in pipeline-aborted state */
1812 "INSERT INTO pq_pipeline_tst VALUES (1)",
1813 0, NULL, NULL, NULL, NULL, 0) != 1)
1814 pg_fatal("failed to send query: %s",
1816 if (PQpipelineSync(conn) != 1)
1817 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1818 num_syncs++;
1819
1820 /*
1821 * This insert fails even though the pipeline got a SYNC, because we're in
1822 * an aborted transaction
1823 */
1825 "INSERT INTO pq_pipeline_tst VALUES (2)",
1826 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 pg_fatal("failed to send query: %s",
1829 if (PQpipelineSync(conn) != 1)
1830 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1831 num_syncs++;
1832
1833 /*
1834 * Send ROLLBACK using prepared stmt. This one works because we just did
1835 * PQpipelineSync above.
1836 */
1837 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1838 pg_fatal("failed to execute prepared: %s",
1840
1841 /*
1842 * Now that we're out of a transaction and in pipeline-good mode, this
1843 * insert works
1844 */
1846 "INSERT INTO pq_pipeline_tst VALUES (3)",
1847 0, NULL, NULL, NULL, NULL, 0) != 1)
1848 pg_fatal("failed to send query: %s",
1850 /* Send two syncs now -- match up to SYNC messages below */
1851 if (PQpipelineSync(conn) != 1)
1852 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1853 num_syncs++;
1854 if (PQpipelineSync(conn) != 1)
1855 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1856 num_syncs++;
1857
1858 expect_null = false;
1859 for (int i = 0;; i++)
1860 {
1861 ExecStatusType restype;
1862
1863 res = PQgetResult(conn);
1864 if (res == NULL)
1865 {
1866 printf("%d: got NULL result\n", i);
1867 if (!expect_null)
1868 pg_fatal("did not expect NULL here");
1869 expect_null = false;
1870 continue;
1871 }
1872 restype = PQresultStatus(res);
1873 printf("%d: got status %s", i, PQresStatus(restype));
1874 if (expect_null)
1875 pg_fatal("expected NULL");
1876 if (restype == PGRES_FATAL_ERROR)
1877 printf("; error: %s", PQerrorMessage(conn));
1878 else if (restype == PGRES_PIPELINE_ABORTED)
1879 {
1880 printf(": command didn't run because pipeline aborted\n");
1881 }
1882 else
1883 printf("\n");
1884 PQclear(res);
1885
1886 if (restype == PGRES_PIPELINE_SYNC)
1887 num_syncs--;
1888 else
1889 expect_null = true;
1890 if (num_syncs <= 0)
1891 break;
1892 }
1893 if (PQgetResult(conn) != NULL)
1894 pg_fatal("returned something extra after all the syncs: %s",
1896
1897 if (PQexitPipelineMode(conn) != 1)
1898 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1899
1900 /* We expect to find one tuple containing the value "3" */
1901 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1903 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1904 if (PQntuples(res) != 1)
1905 pg_fatal("did not get 1 tuple");
1906 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1907 pg_fatal("did not get expected tuple");
1908 PQclear(res);
1909
1910 fprintf(stderr, "ok\n");
1911}

References conn, fprintf(), i, pg_fatal, PGRES_COMMAND_OK, PGRES_FATAL_ERROR, PGRES_PIPELINE_ABORTED, PGRES_PIPELINE_SYNC, PGRES_TUPLES_OK, PQclear(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQexitPipelineMode(), PQgetResult(), PQgetvalue(), PQntuples(), PQpipelineSync(), PQresStatus(), PQresultStatus(), PQsendPrepare(), PQsendQueryParams(), PQsendQueryPrepared(), printf, and res.

Referenced by main().

◆ test_uniqviol()

static void test_uniqviol ( PGconn conn)
static

Definition at line 1919 of file libpq_pipeline.c.

1920{
1921 int sock = PQsocket(conn);
1922 PGresult *res;
1923 Oid paramTypes[2] = {INT8OID, INT8OID};
1924 const char *paramValues[2];
1925 char paramValue0[MAXINT8LEN];
1926 char paramValue1[MAXINT8LEN];
1927 int ctr = 0;
1928 int numsent = 0;
1929 int results = 0;
1930 bool read_done = false;
1931 bool write_done = false;
1932 bool error_sent = false;
1933 bool got_error = false;
1934 int switched = 0;
1935 int socketful = 0;
1936 fd_set in_fds;
1937 fd_set out_fds;
1938
1939 fprintf(stderr, "uniqviol ...");
1940
1942
1943 paramValues[0] = paramValue0;
1944 paramValues[1] = paramValue1;
1945 sprintf(paramValue1, "42");
1946
1947 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1948 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1950 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1951
1952 res = PQexec(conn, "begin");
1954 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1955
1956 res = PQprepare(conn, "insertion",
1957 "insert into ppln_uniqviol values ($1, $2) returning id",
1958 2, paramTypes);
1959 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1960 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1961
1962 if (PQenterPipelineMode(conn) != 1)
1963 pg_fatal("failed to enter pipeline mode");
1964
1965 while (!read_done)
1966 {
1967 /*
1968 * Avoid deadlocks by reading everything the server has sent before
1969 * sending anything. (Special precaution is needed here to process
1970 * PQisBusy before testing the socket for read-readiness, because the
1971 * socket does not turn read-ready after "sending" queries in aborted
1972 * pipeline mode.)
1973 */
1974 while (PQisBusy(conn) == 0)
1975 {
1976 bool new_error;
1977
1978 if (results >= numsent)
1979 {
1980 if (write_done)
1981 read_done = true;
1982 break;
1983 }
1984
1985 res = PQgetResult(conn);
1986 new_error = process_result(conn, res, results, numsent);
1987 if (new_error && got_error)
1988 pg_fatal("got two errors");
1989 got_error |= new_error;
1990 if (results++ >= numsent - 1)
1991 {
1992 if (write_done)
1993 read_done = true;
1994 break;
1995 }
1996 }
1997
1998 if (read_done)
1999 break;
2000
2001 FD_ZERO(&out_fds);
2002 FD_SET(sock, &out_fds);
2003
2004 FD_ZERO(&in_fds);
2005 FD_SET(sock, &in_fds);
2006
2007 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2008 {
2009 if (errno == EINTR)
2010 continue;
2011 pg_fatal("select() failed: %m");
2012 }
2013
2014 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2015 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2016
2017 /*
2018 * If the socket is writable and we haven't finished sending queries,
2019 * send some.
2020 */
2021 if (!write_done && FD_ISSET(sock, &out_fds))
2022 {
2023 for (;;)
2024 {
2025 int flush;
2026
2027 /*
2028 * provoke uniqueness violation exactly once after having
2029 * switched to read mode.
2030 */
2031 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2032 {
2033 sprintf(paramValue0, "%d", numsent / 2);
2034 fprintf(stderr, "E");
2035 error_sent = true;
2036 }
2037 else
2038 {
2039 fprintf(stderr, ".");
2040 sprintf(paramValue0, "%d", ctr++);
2041 }
2042
2043 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2044 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2045 numsent++;
2046
2047 /* Are we done writing? */
2048 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2049 {
2050 if (PQsendFlushRequest(conn) != 1)
2051 pg_fatal("failed to send flush request");
2052 write_done = true;
2053 fprintf(stderr, "\ndone writing\n");
2054 PQflush(conn);
2055 break;
2056 }
2057
2058 /* is the outgoing socket full? */
2059 flush = PQflush(conn);
2060 if (flush == -1)
2061 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2062 if (flush == 1)
2063 {
2064 if (socketful == 0)
2065 socketful = numsent;
2066 fprintf(stderr, "\nswitch to reading\n");
2067 switched++;
2068 break;
2069 }
2070 }
2071 }
2072 }
2073
2074 if (!got_error)
2075 pg_fatal("did not get expected error");
2076
2077 fprintf(stderr, "ok\n");
2078}
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
#define sprintf
Definition: port.h:241

References conn, EINTR, fprintf(), MAXINT8LEN, pg_fatal, PGRES_COMMAND_OK, PQconsumeInput(), PQenterPipelineMode(), PQerrorMessage(), PQexec(), PQflush(), PQgetResult(), PQisBusy(), PQprepare(), PQresultStatus(), PQsendFlushRequest(), PQsendQueryPrepared(), PQsetnonblocking(), PQsocket(), process_result(), res, select, and sprintf.

Referenced by main().

◆ usage()

static void usage ( const char *  progname)
static

Definition at line 2135 of file libpq_pipeline.c.

2136{
2137 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2138 fprintf(stderr, "Usage:\n");
2139 fprintf(stderr, " %s [OPTION] tests\n", progname);
2140 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2141 fprintf(stderr, "\nOptions:\n");
2142 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2143 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2144}
static const char *const progname

References fprintf(), and progname.

Referenced by main().

◆ va_end()

◆ va_start()

◆ vfprintf()

vfprintf ( stderr  ,
fmt  ,
args   
)

◆ wait_for_connection_state()

static void wait_for_connection_state ( int  line,
PGconn monitorConn,
int  procpid,
char *  state,
char *  event 
)
static

Definition at line 121 of file libpq_pipeline.c.

123{
124 const Oid paramTypes[] = {INT4OID, TEXTOID};
125 const char *paramValues[2];
126 char *pidstr = psprintf("%d", procpid);
127
128 Assert((state == NULL) ^ (event == NULL));
129
130 paramValues[0] = pidstr;
131 paramValues[1] = state ? state : event;
132
133 while (true)
134 {
135 PGresult *res;
136 char *value;
137
138 if (state != NULL)
139 res = PQexecParams(monitorConn,
140 "SELECT count(*) FROM pg_stat_activity WHERE "
141 "pid = $1 AND state = $2",
142 2, paramTypes, paramValues, NULL, NULL, 0);
143 else
144 res = PQexecParams(monitorConn,
145 "SELECT count(*) FROM pg_stat_activity WHERE "
146 "pid = $1 AND wait_event = $2",
147 2, paramTypes, paramValues, NULL, NULL, 0);
148
150 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
151 if (PQntuples(res) != 1)
152 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
153 if (PQnfields(res) != 1)
154 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
155 value = PQgetvalue(res, 0, 0);
156 if (strcmp(value, "0") != 0)
157 {
158 PQclear(res);
159 break;
160 }
161 PQclear(res);
162
163 /* wait 10ms before polling again */
164 pg_usleep(10000);
165 }
166
167 pfree(pidstr);
168}
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
static struct @162 value
void pg_usleep(long microsec)
Definition: signal.c:53
Definition: regguts.h:323

References Assert(), pfree(), pg_usleep(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexecParams(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), psprintf(), res, and value.

Referenced by send_cancellable_query_impl().

Variable Documentation

◆ create_table_sql

const char* const create_table_sql
static
Initial value:
=
"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
"int8filler int8);"

Definition at line 46 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ drop_table_sql

const char* const drop_table_sql
static
Initial value:
=
"DROP TABLE IF EXISTS pq_pipeline_demo"

Definition at line 44 of file libpq_pipeline.c.

Referenced by test_pipeline_abort(), and test_pipelined_insert().

◆ fmt

static void const char * fmt

Definition at line 27 of file libpq_pipeline.c.

Referenced by _allocAH(), ahprintf(), appendJSONKeyValueFmt(), appendPQExpBuffer(), appendPQExpBufferVA(), appendStringInfo(), appendStringInfoVA(), archprintf(), bail_out(), CreateArchive(), date_test_defmt(), date_test_fmt(), datetime_to_char_body(), dblink_res_error(), do_pset(), do_serialize(), do_to_timestamp(), dttofmtasc_replace(), ecpg_log(), emit_tap_output(), emit_tap_output_v(), err(), errmsg(), errmsg_internal(), errx(), exec_prog(), executeQueryOrDie(), find_end_token(), float4_to_char(), float8_to_char(), fmtfloat(), fmtlong(), format_elog_string(), int4_to_char(), int8_to_char(), interval_to_char(), libpq_append_conn_error(), libpq_append_error(), log_error(), main(), manifest_report_error(), my_strftime(), numeric_to_char(), numeric_to_number(), OpenArchive(), parallel_exec_prog(), parse_datetime(), pg_fatal(), pg_fprintf(), pg_log(), pg_log_filter_error(), pg_log_generic(), pg_log_generic_v(), pg_log_v(), pg_printf(), pg_snprintf(), pg_sprintf(), pg_strfromd(), pg_vfprintf(), pg_vprintf(), pg_vsnprintf(), pg_vsprintf(), PGTYPESdate_defmt_asc(), PGTYPEStimestamp_defmt_asc(), PGTYPEStimestamp_defmt_scan(), PLy_elog_impl(), PLy_exception_set(), pqInternalNotice(), prep_status(), prep_status_progress(), print_lo_result(), printfPQExpBuffer(), ProcessCopyOptions(), psprintf(), pvsnprintf(), px_debug(), rdefmtdate(), ReadHead(), report_backup_error(), report_fatal_error(), report_invalid_record(), report_manifest_error(), report_status(), ReportWalSummaryError(), rfmtdate(), rfmtlong(), tarPrintf(), text_format(), timestamp_to_char(), timestamptz_to_char(), to_date(), to_timestamp(), walsummary_error_callback(), warn_or_exit_horribly(), and write_stderr().

◆ insert_sql

const char* const insert_sql
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)"

Definition at line 49 of file libpq_pipeline.c.

Referenced by test_pipeline_abort().

◆ insert_sql2

const char* const insert_sql2
static
Initial value:
=
"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)"

Definition at line 51 of file libpq_pipeline.c.

Referenced by test_pipelined_insert().

◆ progname

const char* const progname = "libpq_pipeline"
static

Definition at line 32 of file libpq_pipeline.c.

Referenced by usage().

◆ tracefile

char* tracefile = NULL
static

Definition at line 35 of file libpq_pipeline.c.

Referenced by main().