PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pgbench.c
Go to the documentation of this file.
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * src/bin/pgbench/pgbench.c
8  * Copyright (c) 2000-2017, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29 
30 #ifdef WIN32
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
32 #endif /* ! WIN32 */
33 
34 #include "postgres_fe.h"
35 
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "portability/instr_time.h"
39 
40 #include <ctype.h>
41 #include <float.h>
42 #include <limits.h>
43 #include <math.h>
44 #include <signal.h>
45 #include <time.h>
46 #include <sys/time.h>
47 #ifdef HAVE_SYS_SELECT_H
48 #include <sys/select.h>
49 #endif
50 
51 #ifdef HAVE_SYS_RESOURCE_H
52 #include <sys/resource.h> /* for getrlimit */
53 #endif
54 
55 #ifndef M_PI
56 #define M_PI 3.14159265358979323846
57 #endif
58 
59 #include "pgbench.h"
60 
61 #define ERRCODE_UNDEFINED_TABLE "42P01"
62 
63 /*
64  * Multi-platform pthread implementations
65  */
66 
67 #ifdef WIN32
68 /* Use native win32 threads on Windows */
69 typedef struct win32_pthread *pthread_t;
70 typedef int pthread_attr_t;
71 
72 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
73 static int pthread_join(pthread_t th, void **thread_return);
74 #elif defined(ENABLE_THREAD_SAFETY)
75 /* Use platform-dependent pthread capability */
76 #include <pthread.h>
77 #else
78 /* No threads implementation, use none (-j 1) */
79 #define pthread_t void *
80 #endif
81 
82 
83 /********************************************************************
84  * some configurable parameters */
85 
86 /* max number of clients allowed */
87 #ifdef FD_SETSIZE
88 #define MAXCLIENTS (FD_SETSIZE - 10)
89 #else
90 #define MAXCLIENTS 1024
91 #endif
92 
93 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
94 #define DEFAULT_NXACTS 10 /* default nxacts */
95 
96 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
97 
98 int nxacts = 0; /* number of transactions per client */
99 int duration = 0; /* duration in seconds */
100 int64 end_time = 0; /* when to stop in micro seconds, under -T */
101 
102 /*
103  * scaling factor. for example, scale = 10 will make 1000000 tuples in
104  * pgbench_accounts table.
105  */
106 int scale = 1;
107 
108 /*
109  * fillfactor. for example, fillfactor = 90 will use only 90 percent
110  * space during inserts and leave 10 percent free.
111  */
112 int fillfactor = 100;
113 
114 /*
115  * create foreign key constraints on the tables?
116  */
117 int foreign_keys = 0;
118 
119 /*
120  * use unlogged tables?
121  */
123 
124 /*
125  * log sampling rate (1.0 = log everything, 0.0 = option not given)
126  */
127 double sample_rate = 0.0;
128 
129 /*
130  * When threads are throttled to a given rate limit, this is the target delay
131  * to reach that rate in usec. 0 is the default and means no throttling.
132  */
133 int64 throttle_delay = 0;
134 
135 /*
136  * Transactions which take longer than this limit (in usec) are counted as
137  * late, and reported as such, although they are completed anyway. When
138  * throttling is enabled, execution time slots that are more than this late
139  * are skipped altogether, and counted separately.
140  */
141 int64 latency_limit = 0;
142 
143 /*
144  * tablespace selection
145  */
146 char *tablespace = NULL;
147 char *index_tablespace = NULL;
148 
149 /*
150  * end of configurable parameters
151  *********************************************************************/
152 
153 #define nbranches 1 /* Makes little sense to change this. Change
154  * -s instead */
155 #define ntellers 10
156 #define naccounts 100000
157 
158 /*
159  * The scale factor at/beyond which 32bit integers are incapable of storing
160  * 64bit values.
161  *
162  * Although the actual threshold is 21474, we use 20000 because it is easier to
163  * document and remember, and isn't that far away from the real threshold.
164  */
165 #define SCALE_32BIT_THRESHOLD 20000
166 
167 bool use_log; /* log transaction latencies to a file */
168 bool use_quiet; /* quiet logging onto stderr */
169 int agg_interval; /* log aggregates instead of individual
170  * transactions */
171 bool per_script_stats = false; /* whether to collect stats per script */
172 int progress = 0; /* thread progress report every this seconds */
173 bool progress_timestamp = false; /* progress report with Unix time */
174 int nclients = 1; /* number of clients */
175 int nthreads = 1; /* number of threads */
176 bool is_connect; /* establish connection for each transaction */
177 bool is_latencies; /* report per-command latencies */
178 int main_pid; /* main process id used in log filename */
179 
180 char *pghost = "";
181 char *pgport = "";
182 char *login = NULL;
183 char *dbName;
184 char *logfile_prefix = NULL;
185 const char *progname;
186 
187 #define WSEP '@' /* weight separator */
188 
189 volatile bool timer_exceeded = false; /* flag from signal handler */
190 
191 /*
192  * Variable definitions. If a variable has a string value, "value" is that
193  * value, is_numeric is false, and num_value is undefined. If the value is
194  * known to be numeric, is_numeric is true and num_value contains the value
195  * (in any permitted numeric variant). In this case "value" contains the
196  * string equivalent of the number, if we've had occasion to compute that,
197  * or NULL if we haven't.
198  */
199 typedef struct
200 {
201  char *name; /* variable's name */
202  char *value; /* its value in string form, if known */
203  bool is_numeric; /* is numeric value known? */
204  PgBenchValue num_value; /* variable's value in numeric form */
205 } Variable;
206 
207 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
208 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
209 
210 /*
211  * Simple data structure to keep stats about something.
212  *
213  * XXX probably the first value should be kept and used as an offset for
214  * better numerical stability...
215  */
216 typedef struct SimpleStats
217 {
218  int64 count; /* how many values were encountered */
219  double min; /* the minimum seen */
220  double max; /* the maximum seen */
221  double sum; /* sum of values */
222  double sum2; /* sum of squared values */
223 } SimpleStats;
224 
225 /*
226  * Data structure to hold various statistics: per-thread and per-script stats
227  * are maintained and merged together.
228  */
229 typedef struct StatsData
230 {
231  time_t start_time; /* interval start time, for aggregates */
232  int64 cnt; /* number of transactions, including skipped */
233  int64 skipped; /* number of transactions skipped under --rate
234  * and --latency-limit */
237 } StatsData;
238 
239 /*
240  * Connection state machine states.
241  */
242 typedef enum
243 {
244  /*
245  * The client must first choose a script to execute. Once chosen, it can
246  * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
247  * right away (state CSTATE_START_TX).
248  */
250 
251  /*
252  * In CSTATE_START_THROTTLE state, we calculate when to begin the next
253  * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
254  * sleeps until that moment. (If throttling is not enabled, doCustom()
255  * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
256  */
259 
260  /*
261  * CSTATE_START_TX performs start-of-transaction processing. Establishes
262  * a new connection for the transaction, in --connect mode, and records
263  * the transaction start time.
264  */
266 
267  /*
268  * We loop through these states, to process each command in the script:
269  *
270  * CSTATE_START_COMMAND starts the execution of a command. On a SQL
271  * command, the command is sent to the server, and we move to
272  * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
273  * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
274  * meta-commands are executed immediately.
275  *
276  * CSTATE_WAIT_RESULT waits until we get a result set back from the server
277  * for the current command.
278  *
279  * CSTATE_SLEEP waits until the end of \sleep.
280  *
281  * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
282  * command counter, and loops back to CSTATE_START_COMMAND state.
283  */
288 
289  /*
290  * CSTATE_END_TX performs end-of-transaction processing. Calculates
291  * latency, and logs the transaction. In --connect mode, closes the
292  * current connection. Chooses the next script to execute and starts over
293  * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
294  * more work to do.
295  */
297 
298  /*
299  * Final states. CSTATE_ABORTED means that the script execution was
300  * aborted because a command failed, CSTATE_FINISHED means success.
301  */
305 
306 /*
307  * Connection state.
308  */
309 typedef struct
310 {
311  PGconn *con; /* connection handle to DB */
312  int id; /* client No. */
313  ConnectionStateEnum state; /* state machine's current state. */
314 
315  int use_file; /* index in sql_script for this client */
316  int command; /* command number in script */
317 
318  /* client variables */
319  Variable *variables; /* array of variable definitions */
320  int nvariables; /* number of variables */
321  bool vars_sorted; /* are variables sorted by name? */
322 
323  /* various times about current transaction */
324  int64 txn_scheduled; /* scheduled start time of transaction (usec) */
325  int64 sleep_until; /* scheduled start time of next cmd (usec) */
326  instr_time txn_begin; /* used for measuring schedule lag times */
327  instr_time stmt_begin; /* used for measuring statement latencies */
328 
329  bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
330 
331  /* per client collected stats */
332  int64 cnt; /* client transaction count, for -t */
333  int ecnt; /* error count */
334 } CState;
335 
336 /*
337  * Thread state
338  */
339 typedef struct
340 {
341  int tid; /* thread id */
342  pthread_t thread; /* thread handle */
343  CState *state; /* array of CState */
344  int nstate; /* length of state[] */
345  unsigned short random_state[3]; /* separate randomness for each thread */
346  int64 throttle_trigger; /* previous/next throttling (us) */
347  FILE *logfile; /* where to log, or NULL */
348 
349  /* per thread collected stats */
350  instr_time start_time; /* thread start time */
353  int64 latency_late; /* executed but late transactions */
354 } TState;
355 
356 #define INVALID_THREAD ((pthread_t) 0)
357 
358 /*
359  * queries read from files
360  */
361 #define SQL_COMMAND 1
362 #define META_COMMAND 2
363 #define MAX_ARGS 10
364 
365 typedef enum QueryMode
366 {
367  QUERY_SIMPLE, /* simple query */
368  QUERY_EXTENDED, /* extended query */
369  QUERY_PREPARED, /* extended query with prepared statements */
371 } QueryMode;
372 
374 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
375 
376 typedef struct
377 {
378  char *line; /* text of command line */
379  int command_num; /* unique index of this Command struct */
380  int type; /* command type (SQL_COMMAND or META_COMMAND) */
381  int argc; /* number of command words */
382  char *argv[MAX_ARGS]; /* command word list */
383  PgBenchExpr *expr; /* parsed expression, if needed */
384  SimpleStats stats; /* time spent in this command */
385 } Command;
386 
387 typedef struct ParsedScript
388 {
389  const char *desc; /* script descriptor (eg, file name) */
390  int weight; /* selection weight */
391  Command **commands; /* NULL-terminated array of Commands */
392  StatsData stats; /* total time spent in script */
393 } ParsedScript;
394 
395 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
396 static int num_scripts; /* number of scripts in sql_script[] */
397 static int num_commands = 0; /* total number of Command structs */
398 static int64 total_weight = 0;
399 
400 static int debug = 0; /* debug flag */
401 
402 /* Builtin test scripts */
403 typedef struct BuiltinScript
404 {
405  const char *name; /* very short name for -b ... */
406  const char *desc; /* short description */
407  const char *script; /* actual pgbench script */
408 } BuiltinScript;
409 
411 {
412  {
413  "tpcb-like",
414  "<builtin: TPC-B (sort of)>",
415  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
416  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
417  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
418  "\\set delta random(-5000, 5000)\n"
419  "BEGIN;\n"
420  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
421  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
422  "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
423  "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
424  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
425  "END;\n"
426  },
427  {
428  "simple-update",
429  "<builtin: simple update>",
430  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
431  "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
432  "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
433  "\\set delta random(-5000, 5000)\n"
434  "BEGIN;\n"
435  "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
436  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
437  "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
438  "END;\n"
439  },
440  {
441  "select-only",
442  "<builtin: select only>",
443  "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
444  "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
445  }
446 };
447 
448 
449 /* Function prototypes */
450 static void setIntValue(PgBenchValue *pv, int64 ival);
451 static void setDoubleValue(PgBenchValue *pv, double dval);
452 static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
453 static void doLog(TState *thread, CState *st,
454  StatsData *agg, bool skipped, double latency, double lag);
455 static void processXactStats(TState *thread, CState *st, instr_time *now,
456  bool skipped, StatsData *agg);
457 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
458 static void addScript(ParsedScript script);
459 static void *threadRun(void *arg);
460 static void setalarm(int seconds);
461 
462 
463 /* callback functions for our flex lexer */
465  NULL, /* don't need get_variable functionality */
467 };
468 
469 
470 static void
471 usage(void)
472 {
473  printf("%s is a benchmarking tool for PostgreSQL.\n\n"
474  "Usage:\n"
475  " %s [OPTION]... [DBNAME]\n"
476  "\nInitialization options:\n"
477  " -i, --initialize invokes initialization mode\n"
478  " -F, --fillfactor=NUM set fill factor\n"
479  " -n, --no-vacuum do not run VACUUM after initialization\n"
480  " -q, --quiet quiet logging (one message each 5 seconds)\n"
481  " -s, --scale=NUM scaling factor\n"
482  " --foreign-keys create foreign key constraints between tables\n"
483  " --index-tablespace=TABLESPACE\n"
484  " create indexes in the specified tablespace\n"
485  " --tablespace=TABLESPACE create tables in the specified tablespace\n"
486  " --unlogged-tables create tables as unlogged tables\n"
487  "\nOptions to select what to run:\n"
488  " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
489  " (use \"-b list\" to list available scripts)\n"
490  " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
491  " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
492  " (same as \"-b simple-update\")\n"
493  " -S, --select-only perform SELECT-only transactions\n"
494  " (same as \"-b select-only\")\n"
495  "\nBenchmarking options:\n"
496  " -c, --client=NUM number of concurrent database clients (default: 1)\n"
497  " -C, --connect establish new connection for each transaction\n"
498  " -D, --define=VARNAME=VALUE\n"
499  " define variable for use by custom script\n"
500  " -j, --jobs=NUM number of threads (default: 1)\n"
501  " -l, --log write transaction times to log file\n"
502  " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
503  " -M, --protocol=simple|extended|prepared\n"
504  " protocol for submitting queries (default: simple)\n"
505  " -n, --no-vacuum do not run VACUUM before tests\n"
506  " -P, --progress=NUM show thread progress report every NUM seconds\n"
507  " -r, --report-latencies report average latency per command\n"
508  " -R, --rate=NUM target rate in transactions per second\n"
509  " -s, --scale=NUM report this scale factor in output\n"
510  " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
511  " -T, --time=NUM duration of benchmark test in seconds\n"
512  " -v, --vacuum-all vacuum all four standard tables before tests\n"
513  " --aggregate-interval=NUM aggregate data over NUM seconds\n"
514  " --log-prefix=PREFIX prefix for transaction time log file\n"
515  " (default: \"pgbench_log\")\n"
516  " --progress-timestamp use Unix epoch timestamps for progress\n"
517  " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
518  "\nCommon options:\n"
519  " -d, --debug print debugging output\n"
520  " -h, --host=HOSTNAME database server host or socket directory\n"
521  " -p, --port=PORT database server port number\n"
522  " -U, --username=USERNAME connect as specified database user\n"
523  " -V, --version output version information, then exit\n"
524  " -?, --help show this help, then exit\n"
525  "\n"
526  "Report bugs to <pgsql-bugs@postgresql.org>.\n",
527  progname, progname);
528 }
529 
530 /* return whether str matches "^\s*[-+]?[0-9]+$" */
531 static bool
532 is_an_int(const char *str)
533 {
534  const char *ptr = str;
535 
536  /* skip leading spaces; cast is consistent with strtoint64 */
537  while (*ptr && isspace((unsigned char) *ptr))
538  ptr++;
539 
540  /* skip sign */
541  if (*ptr == '+' || *ptr == '-')
542  ptr++;
543 
544  /* at least one digit */
545  if (*ptr && !isdigit((unsigned char) *ptr))
546  return false;
547 
548  /* eat all digits */
549  while (*ptr && isdigit((unsigned char) *ptr))
550  ptr++;
551 
552  /* must have reached end of string */
553  return *ptr == '\0';
554 }
555 
556 
557 /*
558  * strtoint64 -- convert a string to 64-bit integer
559  *
560  * This function is a modified version of scanint8() from
561  * src/backend/utils/adt/int8.c.
562  */
563 int64
564 strtoint64(const char *str)
565 {
566  const char *ptr = str;
567  int64 result = 0;
568  int sign = 1;
569 
570  /*
571  * Do our own scan, rather than relying on sscanf which might be broken
572  * for long long.
573  */
574 
575  /* skip leading spaces */
576  while (*ptr && isspace((unsigned char) *ptr))
577  ptr++;
578 
579  /* handle sign */
580  if (*ptr == '-')
581  {
582  ptr++;
583 
584  /*
585  * Do an explicit check for INT64_MIN. Ugly though this is, it's
586  * cleaner than trying to get the loop below to handle it portably.
587  */
588  if (strncmp(ptr, "9223372036854775808", 19) == 0)
589  {
590  result = PG_INT64_MIN;
591  ptr += 19;
592  goto gotdigits;
593  }
594  sign = -1;
595  }
596  else if (*ptr == '+')
597  ptr++;
598 
599  /* require at least one digit */
600  if (!isdigit((unsigned char) *ptr))
601  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
602 
603  /* process digits */
604  while (*ptr && isdigit((unsigned char) *ptr))
605  {
606  int64 tmp = result * 10 + (*ptr++ - '0');
607 
608  if ((tmp / 10) != result) /* overflow? */
609  fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
610  result = tmp;
611  }
612 
613 gotdigits:
614 
615  /* allow trailing whitespace, but not other trailing chars */
616  while (*ptr != '\0' && isspace((unsigned char) *ptr))
617  ptr++;
618 
619  if (*ptr != '\0')
620  fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
621 
622  return ((sign < 0) ? -result : result);
623 }
624 
625 /* random number generator: uniform distribution from min to max inclusive */
626 static int64
627 getrand(TState *thread, int64 min, int64 max)
628 {
629  /*
630  * Odd coding is so that min and max have approximately the same chance of
631  * being selected as do numbers between them.
632  *
633  * pg_erand48() is thread-safe and concurrent, which is why we use it
634  * rather than random(), which in glibc is non-reentrant, and therefore
635  * protected by a mutex, and therefore a bottleneck on machines with many
636  * CPUs.
637  */
638  return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
639 }
640 
641 /*
642  * random number generator: exponential distribution from min to max inclusive.
643  * the parameter is so that the density of probability for the last cut-off max
644  * value is exp(-parameter).
645  */
646 static int64
647 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
648 {
649  double cut,
650  uniform,
651  rand;
652 
653  /* abort if wrong parameter, but must really be checked beforehand */
654  Assert(parameter > 0.0);
655  cut = exp(-parameter);
656  /* erand in [0, 1), uniform in (0, 1] */
657  uniform = 1.0 - pg_erand48(thread->random_state);
658 
659  /*
660  * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
661  */
662  Assert((1.0 - cut) != 0.0);
663  rand = -log(cut + (1.0 - cut) * uniform) / parameter;
664  /* return int64 random number within between min and max */
665  return min + (int64) ((max - min + 1) * rand);
666 }
667 
668 /* random number generator: gaussian distribution from min to max inclusive */
669 static int64
670 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
671 {
672  double stdev;
673  double rand;
674 
675  /* abort if parameter is too low, but must really be checked beforehand */
676  Assert(parameter >= MIN_GAUSSIAN_PARAM);
677 
678  /*
679  * Get user specified random number from this loop, with -parameter <
680  * stdev <= parameter
681  *
682  * This loop is executed until the number is in the expected range.
683  *
684  * As the minimum parameter is 2.0, the probability of looping is low:
685  * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
686  * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
687  * the worst case. For a parameter value of 5.0, the looping probability
688  * is about e^{-5} * 2 / pi ~ 0.43%.
689  */
690  do
691  {
692  /*
693  * pg_erand48 generates [0,1), but for the basic version of the
694  * Box-Muller transform the two uniformly distributed random numbers
695  * are expected in (0, 1] (see
696  * http://en.wikipedia.org/wiki/Box_muller)
697  */
698  double rand1 = 1.0 - pg_erand48(thread->random_state);
699  double rand2 = 1.0 - pg_erand48(thread->random_state);
700 
701  /* Box-Muller basic form transform */
702  double var_sqrt = sqrt(-2.0 * log(rand1));
703 
704  stdev = var_sqrt * sin(2.0 * M_PI * rand2);
705 
706  /*
707  * we may try with cos, but there may be a bias induced if the
708  * previous value fails the test. To be on the safe side, let us try
709  * over.
710  */
711  }
712  while (stdev < -parameter || stdev >= parameter);
713 
714  /* stdev is in [-parameter, parameter), normalization to [0,1) */
715  rand = (stdev + parameter) / (parameter * 2.0);
716 
717  /* return int64 random number within between min and max */
718  return min + (int64) ((max - min + 1) * rand);
719 }
720 
721 /*
722  * random number generator: generate a value, such that the series of values
723  * will approximate a Poisson distribution centered on the given value.
724  */
725 static int64
726 getPoissonRand(TState *thread, int64 center)
727 {
728  /*
729  * Use inverse transform sampling to generate a value > 0, such that the
730  * expected (i.e. average) value is the given argument.
731  */
732  double uniform;
733 
734  /* erand in [0, 1), uniform in (0, 1] */
735  uniform = 1.0 - pg_erand48(thread->random_state);
736 
737  return (int64) (-log(uniform) * ((double) center) + 0.5);
738 }
739 
740 /*
741  * Initialize the given SimpleStats struct to all zeroes
742  */
743 static void
745 {
746  memset(ss, 0, sizeof(SimpleStats));
747 }
748 
749 /*
750  * Accumulate one value into a SimpleStats struct.
751  */
752 static void
754 {
755  if (ss->count == 0 || val < ss->min)
756  ss->min = val;
757  if (ss->count == 0 || val > ss->max)
758  ss->max = val;
759  ss->count++;
760  ss->sum += val;
761  ss->sum2 += val * val;
762 }
763 
764 /*
765  * Merge two SimpleStats objects
766  */
767 static void
769 {
770  if (acc->count == 0 || ss->min < acc->min)
771  acc->min = ss->min;
772  if (acc->count == 0 || ss->max > acc->max)
773  acc->max = ss->max;
774  acc->count += ss->count;
775  acc->sum += ss->sum;
776  acc->sum2 += ss->sum2;
777 }
778 
779 /*
780  * Initialize a StatsData struct to mostly zeroes, with its start time set to
781  * the given value.
782  */
783 static void
785 {
786  sd->start_time = start_time;
787  sd->cnt = 0;
788  sd->skipped = 0;
789  initSimpleStats(&sd->latency);
790  initSimpleStats(&sd->lag);
791 }
792 
793 /*
794  * Accumulate one additional item into the given stats object.
795  */
796 static void
797 accumStats(StatsData *stats, bool skipped, double lat, double lag)
798 {
799  stats->cnt++;
800 
801  if (skipped)
802  {
803  /* no latency to record on skipped transactions */
804  stats->skipped++;
805  }
806  else
807  {
808  addToSimpleStats(&stats->latency, lat);
809 
810  /* and possibly the same for schedule lag */
811  if (throttle_delay)
812  addToSimpleStats(&stats->lag, lag);
813  }
814 }
815 
816 /* call PQexec() and exit() on failure */
817 static void
818 executeStatement(PGconn *con, const char *sql)
819 {
820  PGresult *res;
821 
822  res = PQexec(con, sql);
823  if (PQresultStatus(res) != PGRES_COMMAND_OK)
824  {
825  fprintf(stderr, "%s", PQerrorMessage(con));
826  exit(1);
827  }
828  PQclear(res);
829 }
830 
831 /* call PQexec() and complain, but without exiting, on failure */
832 static void
833 tryExecuteStatement(PGconn *con, const char *sql)
834 {
835  PGresult *res;
836 
837  res = PQexec(con, sql);
838  if (PQresultStatus(res) != PGRES_COMMAND_OK)
839  {
840  fprintf(stderr, "%s", PQerrorMessage(con));
841  fprintf(stderr, "(ignoring this error and continuing anyway)\n");
842  }
843  PQclear(res);
844 }
845 
846 /* set up a connection to the backend */
847 static PGconn *
849 {
850  PGconn *conn;
851  bool new_pass;
852  static bool have_password = false;
853  static char password[100];
854 
855  /*
856  * Start the connection. Loop until we have a password if requested by
857  * backend.
858  */
859  do
860  {
861 #define PARAMS_ARRAY_SIZE 7
862 
863  const char *keywords[PARAMS_ARRAY_SIZE];
864  const char *values[PARAMS_ARRAY_SIZE];
865 
866  keywords[0] = "host";
867  values[0] = pghost;
868  keywords[1] = "port";
869  values[1] = pgport;
870  keywords[2] = "user";
871  values[2] = login;
872  keywords[3] = "password";
873  values[3] = have_password ? password : NULL;
874  keywords[4] = "dbname";
875  values[4] = dbName;
876  keywords[5] = "fallback_application_name";
877  values[5] = progname;
878  keywords[6] = NULL;
879  values[6] = NULL;
880 
881  new_pass = false;
882 
883  conn = PQconnectdbParams(keywords, values, true);
884 
885  if (!conn)
886  {
887  fprintf(stderr, "connection to database \"%s\" failed\n",
888  dbName);
889  return NULL;
890  }
891 
892  if (PQstatus(conn) == CONNECTION_BAD &&
894  !have_password)
895  {
896  PQfinish(conn);
897  simple_prompt("Password: ", password, sizeof(password), false);
898  have_password = true;
899  new_pass = true;
900  }
901  } while (new_pass);
902 
903  /* check to see that the backend connection was successfully made */
904  if (PQstatus(conn) == CONNECTION_BAD)
905  {
906  fprintf(stderr, "connection to database \"%s\" failed:\n%s",
907  dbName, PQerrorMessage(conn));
908  PQfinish(conn);
909  return NULL;
910  }
911 
912  return conn;
913 }
914 
915 /* throw away response from backend */
916 static void
918 {
919  PGresult *res;
920 
921  do
922  {
923  res = PQgetResult(state->con);
924  if (res)
925  PQclear(res);
926  } while (res);
927 }
928 
929 /* qsort comparator for Variable array */
930 static int
931 compareVariableNames(const void *v1, const void *v2)
932 {
933  return strcmp(((const Variable *) v1)->name,
934  ((const Variable *) v2)->name);
935 }
936 
937 /* Locate a variable by name; returns NULL if unknown */
938 static Variable *
940 {
941  Variable key;
942 
943  /* On some versions of Solaris, bsearch of zero items dumps core */
944  if (st->nvariables <= 0)
945  return NULL;
946 
947  /* Sort if we have to */
948  if (!st->vars_sorted)
949  {
950  qsort((void *) st->variables, st->nvariables, sizeof(Variable),
952  st->vars_sorted = true;
953  }
954 
955  /* Now we can search */
956  key.name = name;
957  return (Variable *) bsearch((void *) &key,
958  (void *) st->variables,
959  st->nvariables,
960  sizeof(Variable),
962 }
963 
964 /* Get the value of a variable, in string form; returns NULL if unknown */
965 static char *
967 {
968  Variable *var;
969  char stringform[64];
970 
971  var = lookupVariable(st, name);
972  if (var == NULL)
973  return NULL; /* not found */
974 
975  if (var->value)
976  return var->value; /* we have it in string form */
977 
978  /* We need to produce a string equivalent of the numeric value */
979  Assert(var->is_numeric);
980  if (var->num_value.type == PGBT_INT)
981  snprintf(stringform, sizeof(stringform),
982  INT64_FORMAT, var->num_value.u.ival);
983  else
984  {
985  Assert(var->num_value.type == PGBT_DOUBLE);
986  snprintf(stringform, sizeof(stringform),
987  "%.*g", DBL_DIG, var->num_value.u.dval);
988  }
989  var->value = pg_strdup(stringform);
990  return var->value;
991 }
992 
993 /* Try to convert variable to numeric form; return false on failure */
994 static bool
996 {
997  if (var->is_numeric)
998  return true; /* no work */
999 
1000  if (is_an_int(var->value))
1001  {
1002  setIntValue(&var->num_value, strtoint64(var->value));
1003  var->is_numeric = true;
1004  }
1005  else /* type should be double */
1006  {
1007  double dv;
1008  char xs;
1009 
1010  if (sscanf(var->value, "%lf%c", &dv, &xs) != 1)
1011  {
1012  fprintf(stderr,
1013  "malformed variable \"%s\" value: \"%s\"\n",
1014  var->name, var->value);
1015  return false;
1016  }
1017  setDoubleValue(&var->num_value, dv);
1018  var->is_numeric = true;
1019  }
1020  return true;
1021 }
1022 
1023 /*
1024  * Check whether a variable's name is allowed.
1025  *
1026  * We allow any non-ASCII character, as well as ASCII letters, digits, and
1027  * underscore.
1028  *
1029  * Keep this in sync with the definitions of variable name characters in
1030  * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1031  * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1032  *
1033  * Note: this static function is copied from "src/bin/psql/variables.c"
1034  */
1035 static bool
1037 {
1038  const unsigned char *ptr = (const unsigned char *) name;
1039 
1040  /* Mustn't be zero-length */
1041  if (*ptr == '\0')
1042  return false;
1043 
1044  while (*ptr)
1045  {
1046  if (IS_HIGHBIT_SET(*ptr) ||
1047  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1048  "_0123456789", *ptr) != NULL)
1049  ptr++;
1050  else
1051  return false;
1052  }
1053 
1054  return true;
1055 }
1056 
1057 /*
1058  * Lookup a variable by name, creating it if need be.
1059  * Caller is expected to assign a value to the variable.
1060  * Returns NULL on failure (bad name).
1061  */
1062 static Variable *
1063 lookupCreateVariable(CState *st, const char *context, char *name)
1064 {
1065  Variable *var;
1066 
1067  var = lookupVariable(st, name);
1068  if (var == NULL)
1069  {
1070  Variable *newvars;
1071 
1072  /*
1073  * Check for the name only when declaring a new variable to avoid
1074  * overhead.
1075  */
1076  if (!valid_variable_name(name))
1077  {
1078  fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1079  context, name);
1080  return NULL;
1081  }
1082 
1083  /* Create variable at the end of the array */
1084  if (st->variables)
1085  newvars = (Variable *) pg_realloc(st->variables,
1086  (st->nvariables + 1) * sizeof(Variable));
1087  else
1088  newvars = (Variable *) pg_malloc(sizeof(Variable));
1089 
1090  st->variables = newvars;
1091 
1092  var = &newvars[st->nvariables];
1093 
1094  var->name = pg_strdup(name);
1095  var->value = NULL;
1096  /* caller is expected to initialize remaining fields */
1097 
1098  st->nvariables++;
1099  /* we don't re-sort the array till we have to */
1100  st->vars_sorted = false;
1101  }
1102 
1103  return var;
1104 }
1105 
1106 /* Assign a string value to a variable, creating it if need be */
1107 /* Returns false on failure (bad name) */
1108 static bool
1109 putVariable(CState *st, const char *context, char *name, const char *value)
1110 {
1111  Variable *var;
1112  char *val;
1113 
1114  var = lookupCreateVariable(st, context, name);
1115  if (!var)
1116  return false;
1117 
1118  /* dup then free, in case value is pointing at this variable */
1119  val = pg_strdup(value);
1120 
1121  if (var->value)
1122  free(var->value);
1123  var->value = val;
1124  var->is_numeric = false;
1125 
1126  return true;
1127 }
1128 
1129 /* Assign a numeric value to a variable, creating it if need be */
1130 /* Returns false on failure (bad name) */
1131 static bool
1132 putVariableNumber(CState *st, const char *context, char *name,
1133  const PgBenchValue *value)
1134 {
1135  Variable *var;
1136 
1137  var = lookupCreateVariable(st, context, name);
1138  if (!var)
1139  return false;
1140 
1141  if (var->value)
1142  free(var->value);
1143  var->value = NULL;
1144  var->is_numeric = true;
1145  var->num_value = *value;
1146 
1147  return true;
1148 }
1149 
1150 /* Assign an integer value to a variable, creating it if need be */
1151 /* Returns false on failure (bad name) */
1152 static bool
1153 putVariableInt(CState *st, const char *context, char *name, int64 value)
1154 {
1155  PgBenchValue val;
1156 
1157  setIntValue(&val, value);
1158  return putVariableNumber(st, context, name, &val);
1159 }
1160 
1161 /*
1162  * Parse a possible variable reference (:varname).
1163  *
1164  * "sql" points at a colon. If what follows it looks like a valid
1165  * variable name, return a malloc'd string containing the variable name,
1166  * and set *eaten to the number of characters consumed.
1167  * Otherwise, return NULL.
1168  */
1169 static char *
1170 parseVariable(const char *sql, int *eaten)
1171 {
1172  int i = 0;
1173  char *name;
1174 
1175  do
1176  {
1177  i++;
1178  } while (IS_HIGHBIT_SET(sql[i]) ||
1179  strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1180  "_0123456789", sql[i]) != NULL);
1181  if (i == 1)
1182  return NULL; /* no valid variable name chars */
1183 
1184  name = pg_malloc(i);
1185  memcpy(name, &sql[1], i - 1);
1186  name[i - 1] = '\0';
1187 
1188  *eaten = i;
1189  return name;
1190 }
1191 
1192 static char *
1193 replaceVariable(char **sql, char *param, int len, char *value)
1194 {
1195  int valueln = strlen(value);
1196 
1197  if (valueln > len)
1198  {
1199  size_t offset = param - *sql;
1200 
1201  *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1202  param = *sql + offset;
1203  }
1204 
1205  if (valueln != len)
1206  memmove(param + valueln, param + len, strlen(param + len) + 1);
1207  memcpy(param, value, valueln);
1208 
1209  return param + valueln;
1210 }
1211 
1212 static char *
1213 assignVariables(CState *st, char *sql)
1214 {
1215  char *p,
1216  *name,
1217  *val;
1218 
1219  p = sql;
1220  while ((p = strchr(p, ':')) != NULL)
1221  {
1222  int eaten;
1223 
1224  name = parseVariable(p, &eaten);
1225  if (name == NULL)
1226  {
1227  while (*p == ':')
1228  {
1229  p++;
1230  }
1231  continue;
1232  }
1233 
1234  val = getVariable(st, name);
1235  free(name);
1236  if (val == NULL)
1237  {
1238  p++;
1239  continue;
1240  }
1241 
1242  p = replaceVariable(&sql, p, eaten, val);
1243  }
1244 
1245  return sql;
1246 }
1247 
1248 static void
1249 getQueryParams(CState *st, const Command *command, const char **params)
1250 {
1251  int i;
1252 
1253  for (i = 0; i < command->argc - 1; i++)
1254  params[i] = getVariable(st, command->argv[i + 1]);
1255 }
1256 
1257 /* get a value as an int, tell if there is a problem */
1258 static bool
1259 coerceToInt(PgBenchValue *pval, int64 *ival)
1260 {
1261  if (pval->type == PGBT_INT)
1262  {
1263  *ival = pval->u.ival;
1264  return true;
1265  }
1266  else
1267  {
1268  double dval = pval->u.dval;
1269 
1270  Assert(pval->type == PGBT_DOUBLE);
1271  if (dval < PG_INT64_MIN || PG_INT64_MAX < dval)
1272  {
1273  fprintf(stderr, "double to int overflow for %f\n", dval);
1274  return false;
1275  }
1276  *ival = (int64) dval;
1277  return true;
1278  }
1279 }
1280 
1281 /* get a value as a double, or tell if there is a problem */
1282 static bool
1283 coerceToDouble(PgBenchValue *pval, double *dval)
1284 {
1285  if (pval->type == PGBT_DOUBLE)
1286  {
1287  *dval = pval->u.dval;
1288  return true;
1289  }
1290  else
1291  {
1292  Assert(pval->type == PGBT_INT);
1293  *dval = (double) pval->u.ival;
1294  return true;
1295  }
1296 }
1297 
1298 /* assign an integer value */
1299 static void
1300 setIntValue(PgBenchValue *pv, int64 ival)
1301 {
1302  pv->type = PGBT_INT;
1303  pv->u.ival = ival;
1304 }
1305 
1306 /* assign a double value */
1307 static void
1308 setDoubleValue(PgBenchValue *pv, double dval)
1309 {
1310  pv->type = PGBT_DOUBLE;
1311  pv->u.dval = dval;
1312 }
1313 
1314 /* maximum number of function arguments */
1315 #define MAX_FARGS 16
1316 
1317 /*
1318  * Recursive evaluation of functions
1319  */
1320 static bool
1321 evalFunc(TState *thread, CState *st,
1323 {
1324  /* evaluate all function arguments */
1325  int nargs = 0;
1326  PgBenchValue vargs[MAX_FARGS];
1327  PgBenchExprLink *l = args;
1328 
1329  for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1330  if (!evaluateExpr(thread, st, l->expr, &vargs[nargs]))
1331  return false;
1332 
1333  if (l != NULL)
1334  {
1335  fprintf(stderr,
1336  "too many function arguments, maximum is %d\n", MAX_FARGS);
1337  return false;
1338  }
1339 
1340  /* then evaluate function */
1341  switch (func)
1342  {
1343  /* overloaded operators */
1344  case PGBENCH_ADD:
1345  case PGBENCH_SUB:
1346  case PGBENCH_MUL:
1347  case PGBENCH_DIV:
1348  case PGBENCH_MOD:
1349  {
1350  PgBenchValue *lval = &vargs[0],
1351  *rval = &vargs[1];
1352 
1353  Assert(nargs == 2);
1354 
1355  /* overloaded type management, double if some double */
1356  if ((lval->type == PGBT_DOUBLE ||
1357  rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1358  {
1359  double ld,
1360  rd;
1361 
1362  if (!coerceToDouble(lval, &ld) ||
1363  !coerceToDouble(rval, &rd))
1364  return false;
1365 
1366  switch (func)
1367  {
1368  case PGBENCH_ADD:
1369  setDoubleValue(retval, ld + rd);
1370  return true;
1371 
1372  case PGBENCH_SUB:
1373  setDoubleValue(retval, ld - rd);
1374  return true;
1375 
1376  case PGBENCH_MUL:
1377  setDoubleValue(retval, ld * rd);
1378  return true;
1379 
1380  case PGBENCH_DIV:
1381  setDoubleValue(retval, ld / rd);
1382  return true;
1383 
1384  default:
1385  /* cannot get here */
1386  Assert(0);
1387  }
1388  }
1389  else /* we have integer operands, or % */
1390  {
1391  int64 li,
1392  ri;
1393 
1394  if (!coerceToInt(lval, &li) ||
1395  !coerceToInt(rval, &ri))
1396  return false;
1397 
1398  switch (func)
1399  {
1400  case PGBENCH_ADD:
1401  setIntValue(retval, li + ri);
1402  return true;
1403 
1404  case PGBENCH_SUB:
1405  setIntValue(retval, li - ri);
1406  return true;
1407 
1408  case PGBENCH_MUL:
1409  setIntValue(retval, li * ri);
1410  return true;
1411 
1412  case PGBENCH_DIV:
1413  case PGBENCH_MOD:
1414  if (ri == 0)
1415  {
1416  fprintf(stderr, "division by zero\n");
1417  return false;
1418  }
1419  /* special handling of -1 divisor */
1420  if (ri == -1)
1421  {
1422  if (func == PGBENCH_DIV)
1423  {
1424  /* overflow check (needed for INT64_MIN) */
1425  if (li == PG_INT64_MIN)
1426  {
1427  fprintf(stderr, "bigint out of range\n");
1428  return false;
1429  }
1430  else
1431  setIntValue(retval, -li);
1432  }
1433  else
1434  setIntValue(retval, 0);
1435  return true;
1436  }
1437  /* else divisor is not -1 */
1438  if (func == PGBENCH_DIV)
1439  setIntValue(retval, li / ri);
1440  else /* func == PGBENCH_MOD */
1441  setIntValue(retval, li % ri);
1442 
1443  return true;
1444 
1445  default:
1446  /* cannot get here */
1447  Assert(0);
1448  }
1449  }
1450  }
1451 
1452  /* no arguments */
1453  case PGBENCH_PI:
1454  setDoubleValue(retval, M_PI);
1455  return true;
1456 
1457  /* 1 overloaded argument */
1458  case PGBENCH_ABS:
1459  {
1460  PgBenchValue *varg = &vargs[0];
1461 
1462  Assert(nargs == 1);
1463 
1464  if (varg->type == PGBT_INT)
1465  {
1466  int64 i = varg->u.ival;
1467 
1468  setIntValue(retval, i < 0 ? -i : i);
1469  }
1470  else
1471  {
1472  double d = varg->u.dval;
1473 
1474  Assert(varg->type == PGBT_DOUBLE);
1475  setDoubleValue(retval, d < 0.0 ? -d : d);
1476  }
1477 
1478  return true;
1479  }
1480 
1481  case PGBENCH_DEBUG:
1482  {
1483  PgBenchValue *varg = &vargs[0];
1484 
1485  Assert(nargs == 1);
1486 
1487  fprintf(stderr, "debug(script=%d,command=%d): ",
1488  st->use_file, st->command + 1);
1489 
1490  if (varg->type == PGBT_INT)
1491  fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
1492  else
1493  {
1494  Assert(varg->type == PGBT_DOUBLE);
1495  fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
1496  }
1497 
1498  *retval = *varg;
1499 
1500  return true;
1501  }
1502 
1503  /* 1 double argument */
1504  case PGBENCH_DOUBLE:
1505  case PGBENCH_SQRT:
1506  {
1507  double dval;
1508 
1509  Assert(nargs == 1);
1510 
1511  if (!coerceToDouble(&vargs[0], &dval))
1512  return false;
1513 
1514  if (func == PGBENCH_SQRT)
1515  dval = sqrt(dval);
1516 
1517  setDoubleValue(retval, dval);
1518  return true;
1519  }
1520 
1521  /* 1 int argument */
1522  case PGBENCH_INT:
1523  {
1524  int64 ival;
1525 
1526  Assert(nargs == 1);
1527 
1528  if (!coerceToInt(&vargs[0], &ival))
1529  return false;
1530 
1531  setIntValue(retval, ival);
1532  return true;
1533  }
1534 
1535  /* variable number of arguments */
1536  case PGBENCH_LEAST:
1537  case PGBENCH_GREATEST:
1538  {
1539  bool havedouble;
1540  int i;
1541 
1542  Assert(nargs >= 1);
1543 
1544  /* need double result if any input is double */
1545  havedouble = false;
1546  for (i = 0; i < nargs; i++)
1547  {
1548  if (vargs[i].type == PGBT_DOUBLE)
1549  {
1550  havedouble = true;
1551  break;
1552  }
1553  }
1554  if (havedouble)
1555  {
1556  double extremum;
1557 
1558  if (!coerceToDouble(&vargs[0], &extremum))
1559  return false;
1560  for (i = 1; i < nargs; i++)
1561  {
1562  double dval;
1563 
1564  if (!coerceToDouble(&vargs[i], &dval))
1565  return false;
1566  if (func == PGBENCH_LEAST)
1567  extremum = Min(extremum, dval);
1568  else
1569  extremum = Max(extremum, dval);
1570  }
1571  setDoubleValue(retval, extremum);
1572  }
1573  else
1574  {
1575  int64 extremum;
1576 
1577  if (!coerceToInt(&vargs[0], &extremum))
1578  return false;
1579  for (i = 1; i < nargs; i++)
1580  {
1581  int64 ival;
1582 
1583  if (!coerceToInt(&vargs[i], &ival))
1584  return false;
1585  if (func == PGBENCH_LEAST)
1586  extremum = Min(extremum, ival);
1587  else
1588  extremum = Max(extremum, ival);
1589  }
1590  setIntValue(retval, extremum);
1591  }
1592  return true;
1593  }
1594 
1595  /* random functions */
1596  case PGBENCH_RANDOM:
1599  {
1600  int64 imin,
1601  imax;
1602 
1603  Assert(nargs >= 2);
1604 
1605  if (!coerceToInt(&vargs[0], &imin) ||
1606  !coerceToInt(&vargs[1], &imax))
1607  return false;
1608 
1609  /* check random range */
1610  if (imin > imax)
1611  {
1612  fprintf(stderr, "empty range given to random\n");
1613  return false;
1614  }
1615  else if (imax - imin < 0 || (imax - imin) + 1 < 0)
1616  {
1617  /* prevent int overflows in random functions */
1618  fprintf(stderr, "random range is too large\n");
1619  return false;
1620  }
1621 
1622  if (func == PGBENCH_RANDOM)
1623  {
1624  Assert(nargs == 2);
1625  setIntValue(retval, getrand(thread, imin, imax));
1626  }
1627  else /* gaussian & exponential */
1628  {
1629  double param;
1630 
1631  Assert(nargs == 3);
1632 
1633  if (!coerceToDouble(&vargs[2], &param))
1634  return false;
1635 
1636  if (func == PGBENCH_RANDOM_GAUSSIAN)
1637  {
1638  if (param < MIN_GAUSSIAN_PARAM)
1639  {
1640  fprintf(stderr,
1641  "gaussian parameter must be at least %f "
1642  "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
1643  return false;
1644  }
1645 
1646  setIntValue(retval,
1647  getGaussianRand(thread, imin, imax, param));
1648  }
1649  else /* exponential */
1650  {
1651  if (param <= 0.0)
1652  {
1653  fprintf(stderr,
1654  "exponential parameter must be greater than zero"
1655  " (got %f)\n", param);
1656  return false;
1657  }
1658 
1659  setIntValue(retval,
1660  getExponentialRand(thread, imin, imax, param));
1661  }
1662  }
1663 
1664  return true;
1665  }
1666 
1667  default:
1668  /* cannot get here */
1669  Assert(0);
1670  /* dead code to avoid a compiler warning */
1671  return false;
1672  }
1673 }
1674 
1675 /*
1676  * Recursive evaluation of an expression in a pgbench script
1677  * using the current state of variables.
1678  * Returns whether the evaluation was ok,
1679  * the value itself is returned through the retval pointer.
1680  */
1681 static bool
1682 evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval)
1683 {
1684  switch (expr->etype)
1685  {
1686  case ENODE_CONSTANT:
1687  {
1688  *retval = expr->u.constant;
1689  return true;
1690  }
1691 
1692  case ENODE_VARIABLE:
1693  {
1694  Variable *var;
1695 
1696  if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
1697  {
1698  fprintf(stderr, "undefined variable \"%s\"\n",
1699  expr->u.variable.varname);
1700  return false;
1701  }
1702 
1703  if (!makeVariableNumeric(var))
1704  return false;
1705 
1706  *retval = var->num_value;
1707  return true;
1708  }
1709 
1710  case ENODE_FUNCTION:
1711  return evalFunc(thread, st,
1712  expr->u.function.function,
1713  expr->u.function.args,
1714  retval);
1715 
1716  default:
1717  /* internal error which should never occur */
1718  fprintf(stderr, "unexpected enode type in evaluation: %d\n",
1719  expr->etype);
1720  exit(1);
1721  }
1722 }
1723 
1724 /*
1725  * Run a shell command. The result is assigned to the variable if not NULL.
1726  * Return true if succeeded, or false on error.
1727  */
1728 static bool
1729 runShellCommand(CState *st, char *variable, char **argv, int argc)
1730 {
1731  char command[SHELL_COMMAND_SIZE];
1732  int i,
1733  len = 0;
1734  FILE *fp;
1735  char res[64];
1736  char *endptr;
1737  int retval;
1738 
1739  /*----------
1740  * Join arguments with whitespace separators. Arguments starting with
1741  * exactly one colon are treated as variables:
1742  * name - append a string "name"
1743  * :var - append a variable named 'var'
1744  * ::name - append a string ":name"
1745  *----------
1746  */
1747  for (i = 0; i < argc; i++)
1748  {
1749  char *arg;
1750  int arglen;
1751 
1752  if (argv[i][0] != ':')
1753  {
1754  arg = argv[i]; /* a string literal */
1755  }
1756  else if (argv[i][1] == ':')
1757  {
1758  arg = argv[i] + 1; /* a string literal starting with colons */
1759  }
1760  else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1761  {
1762  fprintf(stderr, "%s: undefined variable \"%s\"\n",
1763  argv[0], argv[i]);
1764  return false;
1765  }
1766 
1767  arglen = strlen(arg);
1768  if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1769  {
1770  fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1771  return false;
1772  }
1773 
1774  if (i > 0)
1775  command[len++] = ' ';
1776  memcpy(command + len, arg, arglen);
1777  len += arglen;
1778  }
1779 
1780  command[len] = '\0';
1781 
1782  /* Fast path for non-assignment case */
1783  if (variable == NULL)
1784  {
1785  if (system(command))
1786  {
1787  if (!timer_exceeded)
1788  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1789  return false;
1790  }
1791  return true;
1792  }
1793 
1794  /* Execute the command with pipe and read the standard output. */
1795  if ((fp = popen(command, "r")) == NULL)
1796  {
1797  fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1798  return false;
1799  }
1800  if (fgets(res, sizeof(res), fp) == NULL)
1801  {
1802  if (!timer_exceeded)
1803  fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1804  (void) pclose(fp);
1805  return false;
1806  }
1807  if (pclose(fp) < 0)
1808  {
1809  fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1810  return false;
1811  }
1812 
1813  /* Check whether the result is an integer and assign it to the variable */
1814  retval = (int) strtol(res, &endptr, 10);
1815  while (*endptr != '\0' && isspace((unsigned char) *endptr))
1816  endptr++;
1817  if (*res == '\0' || *endptr != '\0')
1818  {
1819  fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1820  argv[0], res);
1821  return false;
1822  }
1823  if (!putVariableInt(st, "setshell", variable, retval))
1824  return false;
1825 
1826 #ifdef DEBUG
1827  printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1828 #endif
1829  return true;
1830 }
1831 
1832 #define MAX_PREPARE_NAME 32
1833 static void
1834 preparedStatementName(char *buffer, int file, int state)
1835 {
1836  sprintf(buffer, "P%d_%d", file, state);
1837 }
1838 
1839 static void
1840 commandFailed(CState *st, char *message)
1841 {
1842  fprintf(stderr,
1843  "client %d aborted in command %d of script %d; %s\n",
1844  st->id, st->command, st->use_file, message);
1845 }
1846 
1847 /* return a script number with a weighted choice. */
1848 static int
1850 {
1851  int i = 0;
1852  int64 w;
1853 
1854  if (num_scripts == 1)
1855  return 0;
1856 
1857  w = getrand(thread, 0, total_weight - 1);
1858  do
1859  {
1860  w -= sql_script[i++].weight;
1861  } while (w >= 0);
1862 
1863  return i - 1;
1864 }
1865 
1866 /* Send a SQL command, using the chosen querymode */
1867 static bool
1869 {
1870  int r;
1871 
1872  if (querymode == QUERY_SIMPLE)
1873  {
1874  char *sql;
1875 
1876  sql = pg_strdup(command->argv[0]);
1877  sql = assignVariables(st, sql);
1878 
1879  if (debug)
1880  fprintf(stderr, "client %d sending %s\n", st->id, sql);
1881  r = PQsendQuery(st->con, sql);
1882  free(sql);
1883  }
1884  else if (querymode == QUERY_EXTENDED)
1885  {
1886  const char *sql = command->argv[0];
1887  const char *params[MAX_ARGS];
1888 
1889  getQueryParams(st, command, params);
1890 
1891  if (debug)
1892  fprintf(stderr, "client %d sending %s\n", st->id, sql);
1893  r = PQsendQueryParams(st->con, sql, command->argc - 1,
1894  NULL, params, NULL, NULL, 0);
1895  }
1896  else if (querymode == QUERY_PREPARED)
1897  {
1898  char name[MAX_PREPARE_NAME];
1899  const char *params[MAX_ARGS];
1900 
1901  if (!st->prepared[st->use_file])
1902  {
1903  int j;
1904  Command **commands = sql_script[st->use_file].commands;
1905 
1906  for (j = 0; commands[j] != NULL; j++)
1907  {
1908  PGresult *res;
1909  char name[MAX_PREPARE_NAME];
1910 
1911  if (commands[j]->type != SQL_COMMAND)
1912  continue;
1913  preparedStatementName(name, st->use_file, j);
1914  res = PQprepare(st->con, name,
1915  commands[j]->argv[0], commands[j]->argc - 1, NULL);
1916  if (PQresultStatus(res) != PGRES_COMMAND_OK)
1917  fprintf(stderr, "%s", PQerrorMessage(st->con));
1918  PQclear(res);
1919  }
1920  st->prepared[st->use_file] = true;
1921  }
1922 
1923  getQueryParams(st, command, params);
1924  preparedStatementName(name, st->use_file, st->command);
1925 
1926  if (debug)
1927  fprintf(stderr, "client %d sending %s\n", st->id, name);
1928  r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1929  params, NULL, NULL, 0);
1930  }
1931  else /* unknown sql mode */
1932  r = 0;
1933 
1934  if (r == 0)
1935  {
1936  if (debug)
1937  fprintf(stderr, "client %d could not send %s\n",
1938  st->id, command->argv[0]);
1939  st->ecnt++;
1940  return false;
1941  }
1942  else
1943  return true;
1944 }
1945 
1946 /*
1947  * Parse the argument to a \sleep command, and return the requested amount
1948  * of delay, in microseconds. Returns true on success, false on error.
1949  */
1950 static bool
1951 evaluateSleep(CState *st, int argc, char **argv, int *usecs)
1952 {
1953  char *var;
1954  int usec;
1955 
1956  if (*argv[1] == ':')
1957  {
1958  if ((var = getVariable(st, argv[1] + 1)) == NULL)
1959  {
1960  fprintf(stderr, "%s: undefined variable \"%s\"\n",
1961  argv[0], argv[1]);
1962  return false;
1963  }
1964  usec = atoi(var);
1965  }
1966  else
1967  usec = atoi(argv[1]);
1968 
1969  if (argc > 2)
1970  {
1971  if (pg_strcasecmp(argv[2], "ms") == 0)
1972  usec *= 1000;
1973  else if (pg_strcasecmp(argv[2], "s") == 0)
1974  usec *= 1000000;
1975  }
1976  else
1977  usec *= 1000000;
1978 
1979  *usecs = usec;
1980  return true;
1981 }
1982 
1983 /*
1984  * Advance the state machine of a connection, if possible.
1985  */
1986 static void
1987 doCustom(TState *thread, CState *st, StatsData *agg)
1988 {
1989  PGresult *res;
1990  Command *command;
1991  instr_time now;
1992  bool end_tx_processed = false;
1993  int64 wait;
1994 
1995  /*
1996  * gettimeofday() isn't free, so we get the current timestamp lazily the
1997  * first time it's needed, and reuse the same value throughout this
1998  * function after that. This also ensures that e.g. the calculated
1999  * latency reported in the log file and in the totals are the same. Zero
2000  * means "not set yet". Reset "now" when we execute shell commands or
2001  * expressions, which might take a non-negligible amount of time, though.
2002  */
2003  INSTR_TIME_SET_ZERO(now);
2004 
2005  /*
2006  * Loop in the state machine, until we have to wait for a result from the
2007  * server (or have to sleep, for throttling or for \sleep).
2008  *
2009  * Note: In the switch-statement below, 'break' will loop back here,
2010  * meaning "continue in the state machine". Return is used to return to
2011  * the caller.
2012  */
2013  for (;;)
2014  {
2015  switch (st->state)
2016  {
2017  /*
2018  * Select transaction to run.
2019  */
2020  case CSTATE_CHOOSE_SCRIPT:
2021 
2022  st->use_file = chooseScript(thread);
2023 
2024  if (debug)
2025  fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2026  sql_script[st->use_file].desc);
2027 
2028  if (throttle_delay > 0)
2030  else
2031  st->state = CSTATE_START_TX;
2032  break;
2033 
2034  /*
2035  * Handle throttling once per transaction by sleeping.
2036  */
2037  case CSTATE_START_THROTTLE:
2038 
2039  /*
2040  * Generate a delay such that the series of delays will
2041  * approximate a Poisson distribution centered on the
2042  * throttle_delay time.
2043  *
2044  * If transactions are too slow or a given wait is shorter
2045  * than a transaction, the next transaction will start right
2046  * away.
2047  */
2048  Assert(throttle_delay > 0);
2049  wait = getPoissonRand(thread, throttle_delay);
2050 
2051  thread->throttle_trigger += wait;
2052  st->txn_scheduled = thread->throttle_trigger;
2053 
2054  /*
2055  * stop client if next transaction is beyond pgbench end of
2056  * execution
2057  */
2058  if (duration > 0 && st->txn_scheduled > end_time)
2059  {
2060  st->state = CSTATE_FINISHED;
2061  break;
2062  }
2063 
2064  /*
2065  * If --latency-limit is used, and this slot is already late
2066  * so that the transaction will miss the latency limit even if
2067  * it completed immediately, we skip this time slot and
2068  * iterate till the next slot that isn't late yet. But don't
2069  * iterate beyond the -t limit, if one is given.
2070  */
2071  if (latency_limit)
2072  {
2073  int64 now_us;
2074 
2075  if (INSTR_TIME_IS_ZERO(now))
2077  now_us = INSTR_TIME_GET_MICROSEC(now);
2078  while (thread->throttle_trigger < now_us - latency_limit &&
2079  (nxacts <= 0 || st->cnt < nxacts))
2080  {
2081  processXactStats(thread, st, &now, true, agg);
2082  /* next rendez-vous */
2083  wait = getPoissonRand(thread, throttle_delay);
2084  thread->throttle_trigger += wait;
2085  st->txn_scheduled = thread->throttle_trigger;
2086  }
2087  /* stop client if -t exceeded */
2088  if (nxacts > 0 && st->cnt >= nxacts)
2089  {
2090  st->state = CSTATE_FINISHED;
2091  break;
2092  }
2093  }
2094 
2095  st->state = CSTATE_THROTTLE;
2096  if (debug)
2097  fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
2098  st->id, wait);
2099  break;
2100 
2101  /*
2102  * Wait until it's time to start next transaction.
2103  */
2104  case CSTATE_THROTTLE:
2105  if (INSTR_TIME_IS_ZERO(now))
2107  if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
2108  return; /* Still sleeping, nothing to do here */
2109 
2110  /* Else done sleeping, start the transaction */
2111  st->state = CSTATE_START_TX;
2112  break;
2113 
2114  /* Start new transaction */
2115  case CSTATE_START_TX:
2116 
2117  /*
2118  * Establish connection on first call, or if is_connect is
2119  * true.
2120  */
2121  if (st->con == NULL)
2122  {
2123  instr_time start;
2124 
2125  if (INSTR_TIME_IS_ZERO(now))
2127  start = now;
2128  if ((st->con = doConnect()) == NULL)
2129  {
2130  fprintf(stderr, "client %d aborted while establishing connection\n",
2131  st->id);
2132  st->state = CSTATE_ABORTED;
2133  break;
2134  }
2136  INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2137 
2138  /* Reset session-local state */
2139  memset(st->prepared, 0, sizeof(st->prepared));
2140  }
2141 
2142  /*
2143  * Record transaction start time under logging, progress or
2144  * throttling.
2145  */
2146  if (use_log || progress || throttle_delay || latency_limit ||
2147  per_script_stats)
2148  {
2149  if (INSTR_TIME_IS_ZERO(now))
2151  st->txn_begin = now;
2152 
2153  /*
2154  * When not throttling, this is also the transaction's
2155  * scheduled start time.
2156  */
2157  if (!throttle_delay)
2159  }
2160 
2161  /* Begin with the first command */
2162  st->command = 0;
2164  break;
2165 
2166  /*
2167  * Send a command to server (or execute a meta-command)
2168  */
2169  case CSTATE_START_COMMAND:
2170  command = sql_script[st->use_file].commands[st->command];
2171 
2172  /*
2173  * If we reached the end of the script, move to end-of-xact
2174  * processing.
2175  */
2176  if (command == NULL)
2177  {
2178  st->state = CSTATE_END_TX;
2179  break;
2180  }
2181 
2182  /*
2183  * Record statement start time if per-command latencies are
2184  * requested
2185  */
2186  if (is_latencies)
2187  {
2188  if (INSTR_TIME_IS_ZERO(now))
2190  st->stmt_begin = now;
2191  }
2192 
2193  if (command->type == SQL_COMMAND)
2194  {
2195  if (!sendCommand(st, command))
2196  {
2197  commandFailed(st, "SQL command send failed");
2198  st->state = CSTATE_ABORTED;
2199  }
2200  else
2201  st->state = CSTATE_WAIT_RESULT;
2202  }
2203  else if (command->type == META_COMMAND)
2204  {
2205  int argc = command->argc,
2206  i;
2207  char **argv = command->argv;
2208 
2209  if (debug)
2210  {
2211  fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
2212  for (i = 1; i < argc; i++)
2213  fprintf(stderr, " %s", argv[i]);
2214  fprintf(stderr, "\n");
2215  }
2216 
2217  if (pg_strcasecmp(argv[0], "sleep") == 0)
2218  {
2219  /*
2220  * A \sleep doesn't execute anything, we just get the
2221  * delay from the argument, and enter the CSTATE_SLEEP
2222  * state. (The per-command latency will be recorded
2223  * in CSTATE_SLEEP state, not here, after the delay
2224  * has elapsed.)
2225  */
2226  int usec;
2227 
2228  if (!evaluateSleep(st, argc, argv, &usec))
2229  {
2230  commandFailed(st, "execution of meta-command 'sleep' failed");
2231  st->state = CSTATE_ABORTED;
2232  break;
2233  }
2234 
2235  if (INSTR_TIME_IS_ZERO(now))
2237  st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
2238  st->state = CSTATE_SLEEP;
2239  break;
2240  }
2241  else
2242  {
2243  if (pg_strcasecmp(argv[0], "set") == 0)
2244  {
2245  PgBenchExpr *expr = command->expr;
2246  PgBenchValue result;
2247 
2248  if (!evaluateExpr(thread, st, expr, &result))
2249  {
2250  commandFailed(st, "evaluation of meta-command 'set' failed");
2251  st->state = CSTATE_ABORTED;
2252  break;
2253  }
2254 
2255  if (!putVariableNumber(st, argv[0], argv[1], &result))
2256  {
2257  commandFailed(st, "assignment of meta-command 'set' failed");
2258  st->state = CSTATE_ABORTED;
2259  break;
2260  }
2261  }
2262  else if (pg_strcasecmp(argv[0], "setshell") == 0)
2263  {
2264  bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
2265 
2266  if (timer_exceeded) /* timeout */
2267  {
2268  st->state = CSTATE_FINISHED;
2269  break;
2270  }
2271  else if (!ret) /* on error */
2272  {
2273  commandFailed(st, "execution of meta-command 'setshell' failed");
2274  st->state = CSTATE_ABORTED;
2275  break;
2276  }
2277  else
2278  {
2279  /* succeeded */
2280  }
2281  }
2282  else if (pg_strcasecmp(argv[0], "shell") == 0)
2283  {
2284  bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
2285 
2286  if (timer_exceeded) /* timeout */
2287  {
2288  st->state = CSTATE_FINISHED;
2289  break;
2290  }
2291  else if (!ret) /* on error */
2292  {
2293  commandFailed(st, "execution of meta-command 'shell' failed");
2294  st->state = CSTATE_ABORTED;
2295  break;
2296  }
2297  else
2298  {
2299  /* succeeded */
2300  }
2301  }
2302 
2303  /*
2304  * executing the expression or shell command might
2305  * take a non-negligible amount of time, so reset
2306  * 'now'
2307  */
2308  INSTR_TIME_SET_ZERO(now);
2309 
2310  st->state = CSTATE_END_COMMAND;
2311  }
2312  }
2313  break;
2314 
2315  /*
2316  * Wait for the current SQL command to complete
2317  */
2318  case CSTATE_WAIT_RESULT:
2319  command = sql_script[st->use_file].commands[st->command];
2320  if (debug)
2321  fprintf(stderr, "client %d receiving\n", st->id);
2322  if (!PQconsumeInput(st->con))
2323  { /* there's something wrong */
2324  commandFailed(st, "perhaps the backend died while processing");
2325  st->state = CSTATE_ABORTED;
2326  break;
2327  }
2328  if (PQisBusy(st->con))
2329  return; /* don't have the whole result yet */
2330 
2331  /*
2332  * Read and discard the query result;
2333  */
2334  res = PQgetResult(st->con);
2335  switch (PQresultStatus(res))
2336  {
2337  case PGRES_COMMAND_OK:
2338  case PGRES_TUPLES_OK:
2339  case PGRES_EMPTY_QUERY:
2340  /* OK */
2341  PQclear(res);
2342  discard_response(st);
2343  st->state = CSTATE_END_COMMAND;
2344  break;
2345  default:
2346  commandFailed(st, PQerrorMessage(st->con));
2347  PQclear(res);
2348  st->state = CSTATE_ABORTED;
2349  break;
2350  }
2351  break;
2352 
2353  /*
2354  * Wait until sleep is done. This state is entered after a
2355  * \sleep metacommand. The behavior is similar to
2356  * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
2357  * instead of CSTATE_START_TX.
2358  */
2359  case CSTATE_SLEEP:
2360  if (INSTR_TIME_IS_ZERO(now))
2362  if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
2363  return; /* Still sleeping, nothing to do here */
2364  /* Else done sleeping. */
2365  st->state = CSTATE_END_COMMAND;
2366  break;
2367 
2368  /*
2369  * End of command: record stats and proceed to next command.
2370  */
2371  case CSTATE_END_COMMAND:
2372 
2373  /*
2374  * command completed: accumulate per-command execution times
2375  * in thread-local data structure, if per-command latencies
2376  * are requested.
2377  */
2378  if (is_latencies)
2379  {
2380  if (INSTR_TIME_IS_ZERO(now))
2382 
2383  /* XXX could use a mutex here, but we choose not to */
2384  command = sql_script[st->use_file].commands[st->command];
2385  addToSimpleStats(&command->stats,
2386  INSTR_TIME_GET_DOUBLE(now) -
2388  }
2389 
2390  /* Go ahead with next command */
2391  st->command++;
2393  break;
2394 
2395  /*
2396  * End of transaction.
2397  */
2398  case CSTATE_END_TX:
2399 
2400  /* transaction finished: calculate latency and do log */
2401  processXactStats(thread, st, &now, false, agg);
2402 
2403  if (is_connect)
2404  {
2405  PQfinish(st->con);
2406  st->con = NULL;
2407  INSTR_TIME_SET_ZERO(now);
2408  }
2409 
2410  if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
2411  {
2412  /* exit success */
2413  st->state = CSTATE_FINISHED;
2414  break;
2415  }
2416 
2417  /*
2418  * No transaction is underway anymore.
2419  */
2421 
2422  /*
2423  * If we paced through all commands in the script in this
2424  * loop, without returning to the caller even once, do it now.
2425  * This gives the thread a chance to process other
2426  * connections, and to do progress reporting. This can
2427  * currently only happen if the script consists entirely of
2428  * meta-commands.
2429  */
2430  if (end_tx_processed)
2431  return;
2432  else
2433  {
2434  end_tx_processed = true;
2435  break;
2436  }
2437 
2438  /*
2439  * Final states. Close the connection if it's still open.
2440  */
2441  case CSTATE_ABORTED:
2442  case CSTATE_FINISHED:
2443  if (st->con != NULL)
2444  {
2445  PQfinish(st->con);
2446  st->con = NULL;
2447  }
2448  return;
2449  }
2450  }
2451 }
2452 
2453 /*
2454  * Print log entry after completing one transaction.
2455  *
2456  * We print Unix-epoch timestamps in the log, so that entries can be
2457  * correlated against other logs. On some platforms this could be obtained
2458  * from the instr_time reading the caller has, but rather than get entangled
2459  * with that, we just eat the cost of an extra syscall in all cases.
2460  */
2461 static void
2462 doLog(TState *thread, CState *st,
2463  StatsData *agg, bool skipped, double latency, double lag)
2464 {
2465  FILE *logfile = thread->logfile;
2466 
2467  Assert(use_log);
2468 
2469  /*
2470  * Skip the log entry if sampling is enabled and this row doesn't belong
2471  * to the random sample.
2472  */
2473  if (sample_rate != 0.0 &&
2474  pg_erand48(thread->random_state) > sample_rate)
2475  return;
2476 
2477  /* should we aggregate the results or not? */
2478  if (agg_interval > 0)
2479  {
2480  /*
2481  * Loop until we reach the interval of the current moment, and print
2482  * any empty intervals in between (this may happen with very low tps,
2483  * e.g. --rate=0.1).
2484  */
2485  time_t now = time(NULL);
2486 
2487  while (agg->start_time + agg_interval <= now)
2488  {
2489  /* print aggregated report to logfile */
2490  fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
2491  (long) agg->start_time,
2492  agg->cnt,
2493  agg->latency.sum,
2494  agg->latency.sum2,
2495  agg->latency.min,
2496  agg->latency.max);
2497  if (throttle_delay)
2498  {
2499  fprintf(logfile, " %.0f %.0f %.0f %.0f",
2500  agg->lag.sum,
2501  agg->lag.sum2,
2502  agg->lag.min,
2503  agg->lag.max);
2504  if (latency_limit)
2505  fprintf(logfile, " " INT64_FORMAT, agg->skipped);
2506  }
2507  fputc('\n', logfile);
2508 
2509  /* reset data and move to next interval */
2510  initStats(agg, agg->start_time + agg_interval);
2511  }
2512 
2513  /* accumulate the current transaction */
2514  accumStats(agg, skipped, latency, lag);
2515  }
2516  else
2517  {
2518  /* no, print raw transactions */
2519  struct timeval tv;
2520 
2521  gettimeofday(&tv, NULL);
2522  if (skipped)
2523  fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
2524  st->id, st->cnt, st->use_file,
2525  (long) tv.tv_sec, (long) tv.tv_usec);
2526  else
2527  fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
2528  st->id, st->cnt, latency, st->use_file,
2529  (long) tv.tv_sec, (long) tv.tv_usec);
2530  if (throttle_delay)
2531  fprintf(logfile, " %.0f", lag);
2532  fputc('\n', logfile);
2533  }
2534 }
2535 
2536 /*
2537  * Accumulate and report statistics at end of a transaction.
2538  *
2539  * (This is also called when a transaction is late and thus skipped.
2540  * Note that even skipped transactions are counted in the "cnt" fields.)
2541  */
2542 static void
2544  bool skipped, StatsData *agg)
2545 {
2546  double latency = 0.0,
2547  lag = 0.0;
2548  bool thread_details = progress || throttle_delay || latency_limit,
2549  detailed = thread_details || use_log || per_script_stats;
2550 
2551  if (detailed && !skipped)
2552  {
2553  if (INSTR_TIME_IS_ZERO(*now))
2554  INSTR_TIME_SET_CURRENT(*now);
2555 
2556  /* compute latency & lag */
2557  latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
2559  }
2560 
2561  if (thread_details)
2562  {
2563  /* keep detailed thread stats */
2564  accumStats(&thread->stats, skipped, latency, lag);
2565 
2566  /* count transactions over the latency limit, if needed */
2567  if (latency_limit && latency > latency_limit)
2568  thread->latency_late++;
2569  }
2570  else
2571  {
2572  /* no detailed stats, just count */
2573  thread->stats.cnt++;
2574  }
2575 
2576  /* client stat is just counting */
2577  st->cnt++;
2578 
2579  if (use_log)
2580  doLog(thread, st, agg, skipped, latency, lag);
2581 
2582  /* XXX could use a mutex here, but we choose not to */
2583  if (per_script_stats)
2584  accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
2585 }
2586 
2587 
2588 /* discard connections */
2589 static void
2591 {
2592  int i;
2593 
2594  for (i = 0; i < length; i++)
2595  {
2596  if (state[i].con)
2597  {
2598  PQfinish(state[i].con);
2599  state[i].con = NULL;
2600  }
2601  }
2602 }
2603 
2604 /* create tables and setup data */
2605 static void
2606 init(bool is_no_vacuum)
2607 {
2608 /*
2609  * The scale factor at/beyond which 32-bit integers are insufficient for
2610  * storing TPC-B account IDs.
2611  *
2612  * Although the actual threshold is 21474, we use 20000 because it is easier to
2613  * document and remember, and isn't that far away from the real threshold.
2614  */
2615 #define SCALE_32BIT_THRESHOLD 20000
2616 
2617  /*
2618  * Note: TPC-B requires at least 100 bytes per row, and the "filler"
2619  * fields in these table declarations were intended to comply with that.
2620  * The pgbench_accounts table complies with that because the "filler"
2621  * column is set to blank-padded empty string. But for all other tables
2622  * the columns default to NULL and so don't actually take any space. We
2623  * could fix that by giving them non-null default values. However, that
2624  * would completely break comparability of pgbench results with prior
2625  * versions. Since pgbench has never pretended to be fully TPC-B compliant
2626  * anyway, we stick with the historical behavior.
2627  */
2628  struct ddlinfo
2629  {
2630  const char *table; /* table name */
2631  const char *smcols; /* column decls if accountIDs are 32 bits */
2632  const char *bigcols; /* column decls if accountIDs are 64 bits */
2633  int declare_fillfactor;
2634  };
2635  static const struct ddlinfo DDLs[] = {
2636  {
2637  "pgbench_history",
2638  "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
2639  "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
2640  0
2641  },
2642  {
2643  "pgbench_tellers",
2644  "tid int not null,bid int,tbalance int,filler char(84)",
2645  "tid int not null,bid int,tbalance int,filler char(84)",
2646  1
2647  },
2648  {
2649  "pgbench_accounts",
2650  "aid int not null,bid int,abalance int,filler char(84)",
2651  "aid bigint not null,bid int,abalance int,filler char(84)",
2652  1
2653  },
2654  {
2655  "pgbench_branches",
2656  "bid int not null,bbalance int,filler char(88)",
2657  "bid int not null,bbalance int,filler char(88)",
2658  1
2659  }
2660  };
2661  static const char *const DDLINDEXes[] = {
2662  "alter table pgbench_branches add primary key (bid)",
2663  "alter table pgbench_tellers add primary key (tid)",
2664  "alter table pgbench_accounts add primary key (aid)"
2665  };
2666  static const char *const DDLKEYs[] = {
2667  "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
2668  "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
2669  "alter table pgbench_history add foreign key (bid) references pgbench_branches",
2670  "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
2671  "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
2672  };
2673 
2674  PGconn *con;
2675  PGresult *res;
2676  char sql[256];
2677  int i;
2678  int64 k;
2679 
2680  /* used to track elapsed time and estimate of the remaining time */
2681  instr_time start,
2682  diff;
2683  double elapsed_sec,
2684  remaining_sec;
2685  int log_interval = 1;
2686 
2687  if ((con = doConnect()) == NULL)
2688  exit(1);
2689 
2690  for (i = 0; i < lengthof(DDLs); i++)
2691  {
2692  char opts[256];
2693  char buffer[256];
2694  const struct ddlinfo *ddl = &DDLs[i];
2695  const char *cols;
2696 
2697  /* Remove old table, if it exists. */
2698  snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
2699  executeStatement(con, buffer);
2700 
2701  /* Construct new create table statement. */
2702  opts[0] = '\0';
2703  if (ddl->declare_fillfactor)
2704  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2705  " with (fillfactor=%d)", fillfactor);
2706  if (tablespace != NULL)
2707  {
2708  char *escape_tablespace;
2709 
2710  escape_tablespace = PQescapeIdentifier(con, tablespace,
2711  strlen(tablespace));
2712  snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
2713  " tablespace %s", escape_tablespace);
2714  PQfreemem(escape_tablespace);
2715  }
2716 
2717  cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
2718 
2719  snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
2720  unlogged_tables ? " unlogged" : "",
2721  ddl->table, cols, opts);
2722 
2723  executeStatement(con, buffer);
2724  }
2725 
2726  executeStatement(con, "begin");
2727 
2728  for (i = 0; i < nbranches * scale; i++)
2729  {
2730  /* "filler" column defaults to NULL */
2731  snprintf(sql, sizeof(sql),
2732  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2733  i + 1);
2734  executeStatement(con, sql);
2735  }
2736 
2737  for (i = 0; i < ntellers * scale; i++)
2738  {
2739  /* "filler" column defaults to NULL */
2740  snprintf(sql, sizeof(sql),
2741  "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2742  i + 1, i / ntellers + 1);
2743  executeStatement(con, sql);
2744  }
2745 
2746  executeStatement(con, "commit");
2747 
2748  /*
2749  * fill the pgbench_accounts table with some data
2750  */
2751  fprintf(stderr, "creating tables...\n");
2752 
2753  executeStatement(con, "begin");
2754  executeStatement(con, "truncate pgbench_accounts");
2755 
2756  res = PQexec(con, "copy pgbench_accounts from stdin");
2757  if (PQresultStatus(res) != PGRES_COPY_IN)
2758  {
2759  fprintf(stderr, "%s", PQerrorMessage(con));
2760  exit(1);
2761  }
2762  PQclear(res);
2763 
2764  INSTR_TIME_SET_CURRENT(start);
2765 
2766  for (k = 0; k < (int64) naccounts * scale; k++)
2767  {
2768  int64 j = k + 1;
2769 
2770  /* "filler" column defaults to blank padded empty string */
2771  snprintf(sql, sizeof(sql),
2772  INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2773  j, k / naccounts + 1, 0);
2774  if (PQputline(con, sql))
2775  {
2776  fprintf(stderr, "PQputline failed\n");
2777  exit(1);
2778  }
2779 
2780  /*
2781  * If we want to stick with the original logging, print a message each
2782  * 100k inserted rows.
2783  */
2784  if ((!use_quiet) && (j % 100000 == 0))
2785  {
2786  INSTR_TIME_SET_CURRENT(diff);
2787  INSTR_TIME_SUBTRACT(diff, start);
2788 
2789  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2790  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2791 
2792  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2793  j, (int64) naccounts * scale,
2794  (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2795  elapsed_sec, remaining_sec);
2796  }
2797  /* let's not call the timing for each row, but only each 100 rows */
2798  else if (use_quiet && (j % 100 == 0))
2799  {
2800  INSTR_TIME_SET_CURRENT(diff);
2801  INSTR_TIME_SUBTRACT(diff, start);
2802 
2803  elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2804  remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2805 
2806  /* have we reached the next interval (or end)? */
2807  if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2808  {
2809  fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2810  j, (int64) naccounts * scale,
2811  (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2812 
2813  /* skip to the next interval */
2814  log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2815  }
2816  }
2817 
2818  }
2819  if (PQputline(con, "\\.\n"))
2820  {
2821  fprintf(stderr, "very last PQputline failed\n");
2822  exit(1);
2823  }
2824  if (PQendcopy(con))
2825  {
2826  fprintf(stderr, "PQendcopy failed\n");
2827  exit(1);
2828  }
2829  executeStatement(con, "commit");
2830 
2831  /* vacuum */
2832  if (!is_no_vacuum)
2833  {
2834  fprintf(stderr, "vacuum...\n");
2835  executeStatement(con, "vacuum analyze pgbench_branches");
2836  executeStatement(con, "vacuum analyze pgbench_tellers");
2837  executeStatement(con, "vacuum analyze pgbench_accounts");
2838  executeStatement(con, "vacuum analyze pgbench_history");
2839  }
2840 
2841  /*
2842  * create indexes
2843  */
2844  fprintf(stderr, "set primary keys...\n");
2845  for (i = 0; i < lengthof(DDLINDEXes); i++)
2846  {
2847  char buffer[256];
2848 
2849  strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2850 
2851  if (index_tablespace != NULL)
2852  {
2853  char *escape_tablespace;
2854 
2855  escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2856  strlen(index_tablespace));
2857  snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2858  " using index tablespace %s", escape_tablespace);
2859  PQfreemem(escape_tablespace);
2860  }
2861 
2862  executeStatement(con, buffer);
2863  }
2864 
2865  /*
2866  * create foreign keys
2867  */
2868  if (foreign_keys)
2869  {
2870  fprintf(stderr, "set foreign keys...\n");
2871  for (i = 0; i < lengthof(DDLKEYs); i++)
2872  {
2873  executeStatement(con, DDLKEYs[i]);
2874  }
2875  }
2876 
2877  fprintf(stderr, "done.\n");
2878  PQfinish(con);
2879 }
2880 
2881 /*
2882  * Replace :param with $n throughout the command's SQL text, which
2883  * is a modifiable string in cmd->argv[0].
2884  */
2885 static bool
2887 {
2888  char *sql,
2889  *p;
2890 
2891  /* We don't want to scribble on cmd->argv[0] until done */
2892  sql = pg_strdup(cmd->argv[0]);
2893 
2894  cmd->argc = 1;
2895 
2896  p = sql;
2897  while ((p = strchr(p, ':')) != NULL)
2898  {
2899  char var[12];
2900  char *name;
2901  int eaten;
2902 
2903  name = parseVariable(p, &eaten);
2904  if (name == NULL)
2905  {
2906  while (*p == ':')
2907  {
2908  p++;
2909  }
2910  continue;
2911  }
2912 
2913  if (cmd->argc >= MAX_ARGS)
2914  {
2915  fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
2916  MAX_ARGS - 1, cmd->argv[0]);
2917  pg_free(name);
2918  return false;
2919  }
2920 
2921  sprintf(var, "$%d", cmd->argc);
2922  p = replaceVariable(&sql, p, eaten, var);
2923 
2924  cmd->argv[cmd->argc] = name;
2925  cmd->argc++;
2926  }
2927 
2928  pg_free(cmd->argv[0]);
2929  cmd->argv[0] = sql;
2930  return true;
2931 }
2932 
2933 /*
2934  * Simple error-printing function, might be needed by lexer
2935  */
2936 static void
2937 pgbench_error(const char *fmt,...)
2938 {
2939  va_list ap;
2940 
2941  fflush(stdout);
2942  va_start(ap, fmt);
2943  vfprintf(stderr, _(fmt), ap);
2944  va_end(ap);
2945 }
2946 
2947 /*
2948  * syntax error while parsing a script (in practice, while parsing a
2949  * backslash command, because we don't detect syntax errors in SQL)
2950  *
2951  * source: source of script (filename or builtin-script ID)
2952  * lineno: line number within script (count from 1)
2953  * line: whole line of backslash command, if available
2954  * command: backslash command name, if available
2955  * msg: the actual error message
2956  * more: optional extra message
2957  * column: zero-based column number, or -1 if unknown
2958  */
2959 void
2960 syntax_error(const char *source, int lineno,
2961  const char *line, const char *command,
2962  const char *msg, const char *more, int column)
2963 {
2964  fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2965  if (more != NULL)
2966  fprintf(stderr, " (%s)", more);
2967  if (column >= 0 && line == NULL)
2968  fprintf(stderr, " at column %d", column + 1);
2969  if (command != NULL)
2970  fprintf(stderr, " in command \"%s\"", command);
2971  fprintf(stderr, "\n");
2972  if (line != NULL)
2973  {
2974  fprintf(stderr, "%s\n", line);
2975  if (column >= 0)
2976  {
2977  int i;
2978 
2979  for (i = 0; i < column; i++)
2980  fprintf(stderr, " ");
2981  fprintf(stderr, "^ error found here\n");
2982  }
2983  }
2984  exit(1);
2985 }
2986 
2987 /*
2988  * Parse a SQL command; return a Command struct, or NULL if it's a comment
2989  *
2990  * On entry, psqlscan.l has collected the command into "buf", so we don't
2991  * really need to do much here except check for comment and set up a
2992  * Command struct.
2993  */
2994 static Command *
2996 {
2997  Command *my_command;
2998  char *p;
2999  char *nlpos;
3000 
3001  /* Skip any leading whitespace, as well as "--" style comments */
3002  p = buf->data;
3003  for (;;)
3004  {
3005  if (isspace((unsigned char) *p))
3006  p++;
3007  else if (strncmp(p, "--", 2) == 0)
3008  {
3009  p = strchr(p, '\n');
3010  if (p == NULL)
3011  return NULL;
3012  p++;
3013  }
3014  else
3015  break;
3016  }
3017 
3018  /* If there's nothing but whitespace and comments, we're done */
3019  if (*p == '\0')
3020  return NULL;
3021 
3022  /* Allocate and initialize Command structure */
3023  my_command = (Command *) pg_malloc0(sizeof(Command));
3024  my_command->command_num = num_commands++;
3025  my_command->type = SQL_COMMAND;
3026  initSimpleStats(&my_command->stats);
3027 
3028  /*
3029  * Install query text as the sole argv string. If we are using a
3030  * non-simple query mode, we'll extract parameters from it later.
3031  */
3032  my_command->argv[0] = pg_strdup(p);
3033  my_command->argc = 1;
3034 
3035  /*
3036  * If SQL command is multi-line, we only want to save the first line as
3037  * the "line" label.
3038  */
3039  nlpos = strchr(p, '\n');
3040  if (nlpos)
3041  {
3042  my_command->line = pg_malloc(nlpos - p + 1);
3043  memcpy(my_command->line, p, nlpos - p);
3044  my_command->line[nlpos - p] = '\0';
3045  }
3046  else
3047  my_command->line = pg_strdup(p);
3048 
3049  return my_command;
3050 }
3051 
3052 /*
3053  * Parse a backslash command; return a Command struct, or NULL if comment
3054  *
3055  * At call, we have scanned only the initial backslash.
3056  */
3057 static Command *
3058 process_backslash_command(PsqlScanState sstate, const char *source)
3059 {
3060  Command *my_command;
3061  PQExpBufferData word_buf;
3062  int word_offset;
3063  int offsets[MAX_ARGS]; /* offsets of argument words */
3064  int start_offset;
3065  int lineno;
3066  int j;
3067 
3068  initPQExpBuffer(&word_buf);
3069 
3070  /* Remember location of the backslash */
3071  start_offset = expr_scanner_offset(sstate) - 1;
3072  lineno = expr_scanner_get_lineno(sstate, start_offset);
3073 
3074  /* Collect first word of command */
3075  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3076  {
3077  termPQExpBuffer(&word_buf);
3078  return NULL;
3079  }
3080 
3081  /* Allocate and initialize Command structure */
3082  my_command = (Command *) pg_malloc0(sizeof(Command));
3083  my_command->command_num = num_commands++;
3084  my_command->type = META_COMMAND;
3085  my_command->argc = 0;
3086  initSimpleStats(&my_command->stats);
3087 
3088  /* Save first word (command name) */
3089  j = 0;
3090  offsets[j] = word_offset;
3091  my_command->argv[j++] = pg_strdup(word_buf.data);
3092  my_command->argc++;
3093 
3094  if (pg_strcasecmp(my_command->argv[0], "set") == 0)
3095  {
3096  /* For \set, collect var name, then lex the expression. */
3098 
3099  if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
3100  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3101  "missing argument", NULL, -1);
3102 
3103  offsets[j] = word_offset;
3104  my_command->argv[j++] = pg_strdup(word_buf.data);
3105  my_command->argc++;
3106 
3107  yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
3108  my_command->argv[0]);
3109 
3110  if (expr_yyparse(yyscanner) != 0)
3111  {
3112  /* dead code: exit done from syntax_error called by yyerror */
3113  exit(1);
3114  }
3115 
3116  my_command->expr = expr_parse_result;
3117 
3118  /* Save line, trimming any trailing newline */
3119  my_command->line = expr_scanner_get_substring(sstate,
3120  start_offset,
3121  expr_scanner_offset(sstate),
3122  true);
3123 
3124  expr_scanner_finish(yyscanner);
3125 
3126  termPQExpBuffer(&word_buf);
3127 
3128  return my_command;
3129  }
3130 
3131  /* For all other commands, collect remaining words. */
3132  while (expr_lex_one_word(sstate, &word_buf, &word_offset))
3133  {
3134  if (j >= MAX_ARGS)
3135  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3136  "too many arguments", NULL, -1);
3137 
3138  offsets[j] = word_offset;
3139  my_command->argv[j++] = pg_strdup(word_buf.data);
3140  my_command->argc++;
3141  }
3142 
3143  /* Save line, trimming any trailing newline */
3144  my_command->line = expr_scanner_get_substring(sstate,
3145  start_offset,
3146  expr_scanner_offset(sstate),
3147  true);
3148 
3149  if (pg_strcasecmp(my_command->argv[0], "sleep") == 0)
3150  {
3151  if (my_command->argc < 2)
3152  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3153  "missing argument", NULL, -1);
3154 
3155  if (my_command->argc > 3)
3156  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3157  "too many arguments", NULL,
3158  offsets[3] - start_offset);
3159 
3160  /*
3161  * Split argument into number and unit to allow "sleep 1ms" etc. We
3162  * don't have to terminate the number argument with null because it
3163  * will be parsed with atoi, which ignores trailing non-digit
3164  * characters.
3165  */
3166  if (my_command->argc == 2 && my_command->argv[1][0] != ':')
3167  {
3168  char *c = my_command->argv[1];
3169 
3170  while (isdigit((unsigned char) *c))
3171  c++;
3172  if (*c)
3173  {
3174  my_command->argv[2] = c;
3175  offsets[2] = offsets[1] + (c - my_command->argv[1]);
3176  my_command->argc = 3;
3177  }
3178  }
3179 
3180  if (my_command->argc == 3)
3181  {
3182  if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
3183  pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
3184  pg_strcasecmp(my_command->argv[2], "s") != 0)
3185  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3186  "unrecognized time unit, must be us, ms or s",
3187  my_command->argv[2], offsets[2] - start_offset);
3188  }
3189  }
3190  else if (pg_strcasecmp(my_command->argv[0], "setshell") == 0)
3191  {
3192  if (my_command->argc < 3)
3193  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3194  "missing argument", NULL, -1);
3195  }
3196  else if (pg_strcasecmp(my_command->argv[0], "shell") == 0)
3197  {
3198  if (my_command->argc < 2)
3199  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3200  "missing command", NULL, -1);
3201  }
3202  else
3203  {
3204  syntax_error(source, lineno, my_command->line, my_command->argv[0],
3205  "invalid command", NULL, -1);
3206  }
3207 
3208  termPQExpBuffer(&word_buf);
3209 
3210  return my_command;
3211 }
3212 
3213 /*
3214  * Parse a script (either the contents of a file, or a built-in script)
3215  * and add it to the list of scripts.
3216  */
3217 static void
3218 ParseScript(const char *script, const char *desc, int weight)
3219 {
3220  ParsedScript ps;
3221  PsqlScanState sstate;
3222  PQExpBufferData line_buf;
3223  int alloc_num;
3224  int index;
3225 
3226 #define COMMANDS_ALLOC_NUM 128
3227  alloc_num = COMMANDS_ALLOC_NUM;
3228 
3229  /* Initialize all fields of ps */
3230  ps.desc = desc;
3231  ps.weight = weight;
3232  ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
3233  initStats(&ps.stats, 0);
3234 
3235  /* Prepare to parse script */
3237 
3238  /*
3239  * Ideally, we'd scan scripts using the encoding and stdstrings settings
3240  * we get from a DB connection. However, without major rearrangement of
3241  * pgbench's argument parsing, we can't have a DB connection at the time
3242  * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
3243  * with any backend-safe encoding, though conceivably we could be fooled
3244  * if a script file uses a client-only encoding. We also assume that
3245  * stdstrings should be true, which is a bit riskier.
3246  */
3247  psql_scan_setup(sstate, script, strlen(script), 0, true);
3248 
3249  initPQExpBuffer(&line_buf);
3250 
3251  index = 0;
3252 
3253  for (;;)
3254  {
3255  PsqlScanResult sr;
3256  promptStatus_t prompt;
3257  Command *command;
3258 
3259  resetPQExpBuffer(&line_buf);
3260 
3261  sr = psql_scan(sstate, &line_buf, &prompt);
3262 
3263  /* If we collected a SQL command, process that */
3264  command = process_sql_command(&line_buf, desc);
3265  if (command)
3266  {
3267  ps.commands[index] = command;
3268  index++;
3269 
3270  if (index >= alloc_num)
3271  {
3272  alloc_num += COMMANDS_ALLOC_NUM;
3273  ps.commands = (Command **)
3274  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3275  }
3276  }
3277 
3278  /* If we reached a backslash, process that */
3279  if (sr == PSCAN_BACKSLASH)
3280  {
3281  command = process_backslash_command(sstate, desc);
3282  if (command)
3283  {
3284  ps.commands[index] = command;
3285  index++;
3286 
3287  if (index >= alloc_num)
3288  {
3289  alloc_num += COMMANDS_ALLOC_NUM;
3290  ps.commands = (Command **)
3291  pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
3292  }
3293  }
3294  }
3295 
3296  /* Done if we reached EOF */
3297  if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
3298  break;
3299  }
3300 
3301  ps.commands[index] = NULL;
3302 
3303  addScript(ps);
3304 
3305  termPQExpBuffer(&line_buf);
3306  psql_scan_finish(sstate);
3307  psql_scan_destroy(sstate);
3308 }
3309 
3310 /*
3311  * Read the entire contents of file fd, and return it in a malloc'd buffer.
3312  *
3313  * The buffer will typically be larger than necessary, but we don't care
3314  * in this program, because we'll free it as soon as we've parsed the script.
3315  */
3316 static char *
3318 {
3319  char *buf;
3320  size_t buflen = BUFSIZ;
3321  size_t used = 0;
3322 
3323  buf = (char *) pg_malloc(buflen);
3324 
3325  for (;;)
3326  {
3327  size_t nread;
3328 
3329  nread = fread(buf + used, 1, BUFSIZ, fd);
3330  used += nread;
3331  /* If fread() read less than requested, must be EOF or error */
3332  if (nread < BUFSIZ)
3333  break;
3334  /* Enlarge buf so we can read some more */
3335  buflen += BUFSIZ;
3336  buf = (char *) pg_realloc(buf, buflen);
3337  }
3338  /* There is surely room for a terminator */
3339  buf[used] = '\0';
3340 
3341  return buf;
3342 }
3343 
3344 /*
3345  * Given a file name, read it and add its script to the list.
3346  * "-" means to read stdin.
3347  * NB: filename must be storage that won't disappear.
3348  */
3349 static void
3350 process_file(const char *filename, int weight)
3351 {
3352  FILE *fd;
3353  char *buf;
3354 
3355  /* Slurp the file contents into "buf" */
3356  if (strcmp(filename, "-") == 0)
3357  fd = stdin;
3358  else if ((fd = fopen(filename, "r")) == NULL)
3359  {
3360  fprintf(stderr, "could not open file \"%s\": %s\n",
3361  filename, strerror(errno));
3362  exit(1);
3363  }
3364 
3365  buf = read_file_contents(fd);
3366 
3367  if (ferror(fd))
3368  {
3369  fprintf(stderr, "could not read file \"%s\": %s\n",
3370  filename, strerror(errno));
3371  exit(1);
3372  }
3373 
3374  if (fd != stdin)
3375  fclose(fd);
3376 
3377  ParseScript(buf, filename, weight);
3378 
3379  free(buf);
3380 }
3381 
3382 /* Parse the given builtin script and add it to the list. */
3383 static void
3384 process_builtin(const BuiltinScript *bi, int weight)
3385 {
3386  ParseScript(bi->script, bi->desc, weight);
3387 }
3388 
3389 /* show available builtin scripts */
3390 static void
3392 {
3393  int i;
3394 
3395  fprintf(stderr, "Available builtin scripts:\n");
3396  for (i = 0; i < lengthof(builtin_script); i++)
3397  fprintf(stderr, "\t%s\n", builtin_script[i].name);
3398  fprintf(stderr, "\n");
3399 }
3400 
3401 /* return builtin script "name" if unambiguous, fails if not found */
3402 static const BuiltinScript *
3403 findBuiltin(const char *name)
3404 {
3405  int i,
3406  found = 0,
3407  len = strlen(name);
3408  const BuiltinScript *result = NULL;
3409 
3410  for (i = 0; i < lengthof(builtin_script); i++)
3411  {
3412  if (strncmp(builtin_script[i].name, name, len) == 0)
3413  {
3414  result = &builtin_script[i];
3415  found++;
3416  }
3417  }
3418 
3419  /* ok, unambiguous result */
3420  if (found == 1)
3421  return result;
3422 
3423  /* error cases */
3424  if (found == 0)
3425  fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
3426  else /* found > 1 */
3427  fprintf(stderr,
3428  "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
3429 
3431  exit(1);
3432 }
3433 
3434 /*
3435  * Determine the weight specification from a script option (-b, -f), if any,
3436  * and return it as an integer (1 is returned if there's no weight). The
3437  * script name is returned in *script as a malloc'd string.
3438  */
3439 static int
3440 parseScriptWeight(const char *option, char **script)
3441 {
3442  char *sep;
3443  int weight;
3444 
3445  if ((sep = strrchr(option, WSEP)))
3446  {
3447  int namelen = sep - option;
3448  long wtmp;
3449  char *badp;
3450 
3451  /* generate the script name */
3452  *script = pg_malloc(namelen + 1);
3453  strncpy(*script, option, namelen);
3454  (*script)[namelen] = '\0';
3455 
3456  /* process digits of the weight spec */
3457  errno = 0;
3458  wtmp = strtol(sep + 1, &badp, 10);
3459  if (errno != 0 || badp == sep + 1 || *badp != '\0')
3460  {
3461  fprintf(stderr, "invalid weight specification: %s\n", sep);
3462  exit(1);
3463  }
3464  if (wtmp > INT_MAX || wtmp < 0)
3465  {
3466  fprintf(stderr,
3467  "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
3468  INT_MAX, (int64) wtmp);
3469  exit(1);
3470  }
3471  weight = wtmp;
3472  }
3473  else
3474  {
3475  *script = pg_strdup(option);
3476  weight = 1;
3477  }
3478 
3479  return weight;
3480 }
3481 
3482 /* append a script to the list of scripts to process */
3483 static void
3485 {
3486  if (script.commands == NULL || script.commands[0] == NULL)
3487  {
3488  fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
3489  exit(1);
3490  }
3491 
3492  if (num_scripts >= MAX_SCRIPTS)
3493  {
3494  fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
3495  exit(1);
3496  }
3497 
3498  sql_script[num_scripts] = script;
3499  num_scripts++;
3500 }
3501 
3502 static void
3503 printSimpleStats(char *prefix, SimpleStats *ss)
3504 {
3505  /* print NaN if no transactions where executed */
3506  double latency = ss->sum / ss->count;
3507  double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
3508 
3509  printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
3510  printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
3511 }
3512 
3513 /* print out results */
3514 static void
3515 printResults(TState *threads, StatsData *total, instr_time total_time,
3516  instr_time conn_total_time, int latency_late)
3517 {
3518  double time_include,
3519  tps_include,
3520  tps_exclude;
3521 
3522  time_include = INSTR_TIME_GET_DOUBLE(total_time);
3523  tps_include = total->cnt / time_include;
3524  tps_exclude = total->cnt / (time_include -
3525  (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
3526 
3527  /* Report test parameters. */
3528  printf("transaction type: %s\n",
3529  num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
3530  printf("scaling factor: %d\n", scale);
3531  printf("query mode: %s\n", QUERYMODE[querymode]);
3532  printf("number of clients: %d\n", nclients);
3533  printf("number of threads: %d\n", nthreads);
3534  if (duration <= 0)
3535  {
3536  printf("number of transactions per client: %d\n", nxacts);
3537  printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
3538  total->cnt - total->skipped, nxacts * nclients);
3539  }
3540  else
3541  {
3542  printf("duration: %d s\n", duration);
3543  printf("number of transactions actually processed: " INT64_FORMAT "\n",
3544  total->cnt);
3545  }
3546 
3547  /* Remaining stats are nonsensical if we failed to execute any xacts */
3548  if (total->cnt <= 0)
3549  return;
3550 
3551  if (throttle_delay && latency_limit)
3552  printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
3553  total->skipped,
3554  100.0 * total->skipped / total->cnt);
3555 
3556  if (latency_limit)
3557  printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
3558  latency_limit / 1000.0, latency_late,
3559  100.0 * latency_late / total->cnt);
3560 
3561  if (throttle_delay || progress || latency_limit)
3562  printSimpleStats("latency", &total->latency);
3563  else
3564  {
3565  /* no measurement, show average latency computed from run time */
3566  printf("latency average = %.3f ms\n",
3567  1000.0 * time_include * nclients / total->cnt);
3568  }
3569 
3570  if (throttle_delay)
3571  {
3572  /*
3573  * Report average transaction lag under rate limit throttling. This
3574  * is the delay between scheduled and actual start times for the
3575  * transaction. The measured lag may be caused by thread/client load,
3576  * the database load, or the Poisson throttling process.
3577  */
3578  printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
3579  0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
3580  }
3581 
3582  printf("tps = %f (including connections establishing)\n", tps_include);
3583  printf("tps = %f (excluding connections establishing)\n", tps_exclude);
3584 
3585  /* Report per-script/command statistics */
3586  if (per_script_stats || latency_limit || is_latencies)
3587  {
3588  int i;
3589 
3590  for (i = 0; i < num_scripts; i++)
3591  {
3592  if (num_scripts > 1)
3593  printf("SQL script %d: %s\n"
3594  " - weight: %d (targets %.1f%% of total)\n"
3595  " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
3596  i + 1, sql_script[i].desc,
3597  sql_script[i].weight,
3598  100.0 * sql_script[i].weight / total_weight,
3599  sql_script[i].stats.cnt,
3600  100.0 * sql_script[i].stats.cnt / total->cnt,
3601  sql_script[i].stats.cnt / time_include);
3602  else
3603  printf("script statistics:\n");
3604 
3605  if (latency_limit)
3606  printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
3607  sql_script[i].stats.skipped,
3608  100.0 * sql_script[i].stats.skipped / sql_script[i].stats.cnt);
3609 
3610  if (num_scripts > 1)
3611  printSimpleStats(" - latency", &sql_script[i].stats.latency);
3612 
3613  /* Report per-command latencies */
3614  if (is_latencies)
3615  {
3616  Command **commands;
3617 
3618  printf(" - statement latencies in milliseconds:\n");
3619 
3620  for (commands = sql_script[i].commands;
3621  *commands != NULL;
3622  commands++)
3623  printf(" %11.3f %s\n",
3624  1000.0 * (*commands)->stats.sum /
3625  (*commands)->stats.count,
3626  (*commands)->line);
3627  }
3628  }
3629  }
3630 }
3631 
3632 
3633 int
3634 main(int argc, char **argv)
3635 {
3636  static struct option long_options[] = {
3637  /* systematic long/short named options */
3638  {"builtin", required_argument, NULL, 'b'},
3639  {"client", required_argument, NULL, 'c'},
3640  {"connect", no_argument, NULL, 'C'},
3641  {"debug", no_argument, NULL, 'd'},
3642  {"define", required_argument, NULL, 'D'},
3643  {"file", required_argument, NULL, 'f'},
3644  {"fillfactor", required_argument, NULL, 'F'},
3645  {"host", required_argument, NULL, 'h'},
3646  {"initialize", no_argument, NULL, 'i'},
3647  {"jobs", required_argument, NULL, 'j'},
3648  {"log", no_argument, NULL, 'l'},
3649  {"latency-limit", required_argument, NULL, 'L'},
3650  {"no-vacuum", no_argument, NULL, 'n'},
3651  {"port", required_argument, NULL, 'p'},
3652  {"progress", required_argument, NULL, 'P'},
3653  {"protocol", required_argument, NULL, 'M'},
3654  {"quiet", no_argument, NULL, 'q'},
3655  {"report-latencies", no_argument, NULL, 'r'},
3656  {"rate", required_argument, NULL, 'R'},
3657  {"scale", required_argument, NULL, 's'},
3658  {"select-only", no_argument, NULL, 'S'},
3659  {"skip-some-updates", no_argument, NULL, 'N'},
3660  {"time", required_argument, NULL, 'T'},
3661  {"transactions", required_argument, NULL, 't'},
3662  {"username", required_argument, NULL, 'U'},
3663  {"vacuum-all", no_argument, NULL, 'v'},
3664  /* long-named only options */
3665  {"foreign-keys", no_argument, &foreign_keys, 1},
3666  {"index-tablespace", required_argument, NULL, 3},
3667  {"tablespace", required_argument, NULL, 2},
3668  {"unlogged-tables", no_argument, &unlogged_tables, 1},
3669  {"sampling-rate", required_argument, NULL, 4},
3670  {"aggregate-interval", required_argument, NULL, 5},
3671  {"progress-timestamp", no_argument, NULL, 6},
3672  {"log-prefix", required_argument, NULL, 7},
3673  {NULL, 0, NULL, 0}
3674  };
3675 
3676  int c;
3677  int is_init_mode = 0; /* initialize mode? */
3678  int is_no_vacuum = 0; /* no vacuum at all before testing? */
3679  int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
3680  int optindex;
3681  bool scale_given = false;
3682 
3683  bool benchmarking_option_set = false;
3684  bool initialization_option_set = false;
3685  bool internal_script_used = false;
3686 
3687  CState *state; /* status of clients */
3688  TState *threads; /* array of thread */
3689 
3690  instr_time start_time; /* start up time */
3691  instr_time total_time;
3692  instr_time conn_total_time;
3693  int64 latency_late = 0;
3694  StatsData stats;
3695  int weight;
3696 
3697  int i;
3698  int nclients_dealt;
3699 
3700 #ifdef HAVE_GETRLIMIT
3701  struct rlimit rlim;
3702 #endif
3703 
3704  PGconn *con;
3705  PGresult *res;
3706  char *env;
3707 
3708  progname = get_progname(argv[0]);
3709 
3710  if (argc > 1)
3711  {
3712  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
3713  {
3714  usage();
3715  exit(0);
3716  }
3717  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
3718  {
3719  puts("pgbench (PostgreSQL) " PG_VERSION);
3720  exit(0);
3721  }
3722  }
3723 
3724 #ifdef WIN32
3725  /* stderr is buffered on Win32. */
3726  setvbuf(stderr, NULL, _IONBF, 0);
3727 #endif
3728 
3729  if ((env = getenv("PGHOST")) != NULL && *env != '\0')
3730  pghost = env;
3731  if ((env = getenv("PGPORT")) != NULL && *env != '\0')
3732  pgport = env;
3733  else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
3734  login = env;
3735 
3736  state = (CState *) pg_malloc(sizeof(CState));
3737  memset(state, 0, sizeof(CState));
3738 
3739  while ((c = getopt_long(argc, argv, "ih:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
3740  {
3741  char *script;
3742 
3743  switch (c)
3744  {
3745  case 'i':
3746  is_init_mode++;
3747  break;
3748  case 'h':
3749  pghost = pg_strdup(optarg);
3750  break;
3751  case 'n':
3752  is_no_vacuum++;
3753  break;
3754  case 'v':
3755  do_vacuum_accounts++;
3756  break;
3757  case 'p':
3758  pgport = pg_strdup(optarg);
3759  break;
3760  case 'd':
3761  debug++;
3762  break;
3763  case 'c':
3764  benchmarking_option_set = true;
3765  nclients = atoi(optarg);
3766  if (nclients <= 0 || nclients > MAXCLIENTS)
3767  {
3768  fprintf(stderr, "invalid number of clients: \"%s\"\n",
3769  optarg);
3770  exit(1);
3771  }
3772 #ifdef HAVE_GETRLIMIT
3773 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
3774  if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
3775 #else /* but BSD doesn't ... */
3776  if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
3777 #endif /* RLIMIT_NOFILE */
3778  {
3779  fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
3780  exit(1);
3781  }
3782  if (rlim.rlim_cur < nclients + 3)
3783  {
3784  fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
3785  nclients + 3, (long) rlim.rlim_cur);
3786  fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
3787  exit(1);
3788  }
3789 #endif /* HAVE_GETRLIMIT */
3790  break;
3791  case 'j': /* jobs */
3792  benchmarking_option_set = true;
3793  nthreads = atoi(optarg);
3794  if (nthreads <= 0)
3795  {
3796  fprintf(stderr, "invalid number of threads: \"%s\"\n",
3797  optarg);
3798  exit(1);
3799  }
3800 #ifndef ENABLE_THREAD_SAFETY
3801  if (nthreads != 1)
3802  {
3803  fprintf(stderr, "threads are not supported on this platform; use -j1\n");
3804  exit(1);
3805  }
3806 #endif /* !ENABLE_THREAD_SAFETY */
3807  break;
3808  case 'C':
3809  benchmarking_option_set = true;
3810  is_connect = true;
3811  break;
3812  case 'r':
3813  benchmarking_option_set = true;
3814  per_script_stats = true;
3815  is_latencies = true;
3816  break;
3817  case 's':
3818  scale_given = true;
3819  scale = atoi(optarg);
3820  if (scale <= 0)
3821  {
3822  fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
3823  exit(1);
3824  }
3825  break;
3826  case 't':
3827  benchmarking_option_set = true;
3828  if (duration > 0)
3829  {
3830  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3831  exit(1);
3832  }
3833  nxacts = atoi(optarg);
3834  if (nxacts <= 0)
3835  {
3836  fprintf(stderr, "invalid number of transactions: \"%s\"\n",
3837  optarg);
3838  exit(1);
3839  }
3840  break;
3841  case 'T':
3842  benchmarking_option_set = true;
3843  if (nxacts > 0)
3844  {
3845  fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
3846  exit(1);
3847  }
3848  duration = atoi(optarg);
3849  if (duration <= 0)
3850  {
3851  fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
3852  exit(1);
3853  }
3854  break;
3855  case 'U':
3856  login = pg_strdup(optarg);
3857  break;
3858  case 'l':
3859  benchmarking_option_set = true;
3860  use_log = true;
3861  break;
3862  case 'q':
3863  initialization_option_set = true;
3864  use_quiet = true;
3865  break;
3866 
3867  case 'b':
3868  if (strcmp(optarg, "list") == 0)
3869  {
3871  exit(0);
3872  }
3873 
3874  weight = parseScriptWeight(optarg, &script);
3875  process_builtin(findBuiltin(script), weight);
3876  benchmarking_option_set = true;
3877  internal_script_used = true;
3878  break;
3879 
3880  case 'S':
3881  process_builtin(findBuiltin("select-only"), 1);
3882  benchmarking_option_set = true;
3883  internal_script_used = true;
3884  break;
3885  case 'N':
3886  process_builtin(findBuiltin("simple-update"), 1);
3887  benchmarking_option_set = true;
3888  internal_script_used = true;
3889  break;
3890  case 'f':
3891  weight = parseScriptWeight(optarg, &script);
3892  process_file(script, weight);
3893  benchmarking_option_set = true;
3894  break;
3895  case 'D':
3896  {
3897  char *p;
3898 
3899  benchmarking_option_set = true;
3900 
3901  if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3902  {
3903  fprintf(stderr, "invalid variable definition: \"%s\"\n",
3904  optarg);
3905  exit(1);
3906  }
3907 
3908  *p++ = '\0';
3909  if (!putVariable(&state[0], "option", optarg, p))
3910  exit(1);
3911  }
3912  break;
3913  case 'F':
3914  initialization_option_set = true;
3915  fillfactor = atoi(optarg);
3916  if (fillfactor < 10 || fillfactor > 100)
3917  {
3918  fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3919  exit(1);
3920  }
3921  break;
3922  case 'M':
3923  benchmarking_option_set = true;
3924  for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3925  if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3926  break;
3927  if (querymode >= NUM_QUERYMODE)
3928  {
3929  fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3930  optarg);
3931  exit(1);
3932  }
3933  break;
3934  case 'P':
3935  benchmarking_option_set = true;
3936  progress = atoi(optarg);
3937  if (progress <= 0)
3938  {
3939  fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3940  optarg);
3941  exit(1);
3942  }
3943  break;
3944  case 'R':
3945  {
3946  /* get a double from the beginning of option value */
3947  double throttle_value = atof(optarg);
3948 
3949  benchmarking_option_set = true;
3950 
3951  if (throttle_value <= 0.0)
3952  {
3953  fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3954  exit(1);
3955  }
3956  /* Invert rate limit into a time offset */
3957  throttle_delay = (int64) (1000000.0 / throttle_value);
3958  }
3959  break;
3960  case 'L':
3961  {
3962  double limit_ms = atof(optarg);
3963 
3964  if (limit_ms <= 0.0)
3965  {
3966  fprintf(stderr, "invalid latency limit: \"%s\"\n",
3967  optarg);
3968  exit(1);
3969  }
3970  benchmarking_option_set = true;
3971  latency_limit = (int64) (limit_ms * 1000);
3972  }
3973  break;
3974  case 0:
3975  /* This covers long options which take no argument. */
3976  if (foreign_keys || unlogged_tables)
3977  initialization_option_set = true;
3978  break;
3979  case 2: /* tablespace */
3980  initialization_option_set = true;
3981  tablespace = pg_strdup(optarg);
3982  break;
3983  case 3: /* index-tablespace */
3984  initialization_option_set = true;
3985  index_tablespace = pg_strdup(optarg);
3986  break;
3987  case 4:
3988  benchmarking_option_set = true;
3989  sample_rate = atof(optarg);
3990  if (sample_rate <= 0.0 || sample_rate > 1.0)
3991  {
3992  fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3993  exit(1);
3994  }
3995  break;
3996  case 5:
3997  benchmarking_option_set = true;
3998  agg_interval = atoi(optarg);
3999  if (agg_interval <= 0)
4000  {
4001  fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
4002  optarg);
4003  exit(1);
4004  }
4005  break;
4006  case 6:
4007  progress_timestamp = true;
4008  benchmarking_option_set = true;
4009  break;
4010  case 7:
4011  benchmarking_option_set = true;
4012  logfile_prefix = pg_strdup(optarg);
4013  break;
4014  default:
4015  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
4016  exit(1);
4017  break;
4018  }
4019  }
4020 
4021  /* set default script if none */
4022  if (num_scripts == 0 && !is_init_mode)
4023  {
4024  process_builtin(findBuiltin("tpcb-like"), 1);
4025  benchmarking_option_set = true;
4026  internal_script_used = true;
4027  }
4028 
4029  /* if not simple query mode, parse the script(s) to find parameters */
4030  if (querymode != QUERY_SIMPLE)
4031  {
4032  for (i = 0; i < num_scripts; i++)
4033  {
4034  Command **commands = sql_script[i].commands;
4035  int j;
4036 
4037  for (j = 0; commands[j] != NULL; j++)
4038  {
4039  if (commands[j]->type != SQL_COMMAND)
4040  continue;
4041  if (!parseQuery(commands[j]))
4042  exit(1);
4043  }
4044  }
4045  }
4046 
4047  /* compute total_weight */
4048  for (i = 0; i < num_scripts; i++)
4049  /* cannot overflow: weight is 32b, total_weight 64b */
4050  total_weight += sql_script[i].weight;
4051 
4052  if (total_weight == 0 && !is_init_mode)
4053  {
4054  fprintf(stderr, "total script weight must not be zero\n");
4055  exit(1);
4056  }
4057 
4058  /* show per script stats if several scripts are used */
4059  if (num_scripts > 1)
4060  per_script_stats = true;
4061 
4062  /*
4063  * Don't need more threads than there are clients. (This is not merely an
4064  * optimization; throttle_delay is calculated incorrectly below if some
4065  * threads have no clients assigned to them.)
4066  */
4067  if (nthreads > nclients)
4068  nthreads = nclients;
4069 
4070  /* compute a per thread delay */
4071  throttle_delay *= nthreads;
4072 
4073  if (argc > optind)
4074  dbName = argv[optind];
4075  else
4076  {
4077  if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
4078  dbName = env;
4079  else if (login != NULL && *login != '\0')
4080  dbName = login;
4081  else
4082  dbName = "";
4083  }
4084 
4085  if (is_init_mode)
4086  {
4087  if (benchmarking_option_set)
4088  {
4089  fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
4090  exit(1);
4091  }
4092 
4093  init(is_no_vacuum);
4094  exit(0);
4095  }
4096  else
4097  {
4098  if (initialization_option_set)
4099  {
4100  fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
4101  exit(1);
4102  }
4103  }
4104 
4105  /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
4106  if (nxacts <= 0 && duration <= 0)
4107  nxacts = DEFAULT_NXACTS;
4108 
4109  /* --sampling-rate may be used only with -l */
4110  if (sample_rate > 0.0 && !use_log)
4111  {
4112  fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
4113  exit(1);
4114  }
4115 
4116  /* --sampling-rate may not be used with --aggregate-interval */
4117  if (sample_rate > 0.0 && agg_interval > 0)
4118  {
4119  fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
4120  exit(1);
4121  }
4122 
4123  if (agg_interval > 0 && !use_log)
4124  {
4125  fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
4126  exit(1);
4127  }
4128 
4129  if (!use_log && logfile_prefix)
4130  {
4131  fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
4132  exit(1);
4133  }
4134 
4135  if (duration > 0 && agg_interval > duration)
4136  {
4137  fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
4138  exit(1);
4139  }
4140 
4141  if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
4142  {
4143  fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
4144  exit(1);
4145  }
4146 
4147  if (progress_timestamp && progress == 0)
4148  {
4149  fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
4150  exit(1);
4151  }
4152 
4153  /*
4154  * save main process id in the global variable because process id will be
4155  * changed after fork.
4156  */
4157  main_pid = (int) getpid();
4158 
4159  if (nclients > 1)
4160  {
4161  state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
4162  memset(state + 1, 0, sizeof(CState) * (nclients - 1));
4163 
4164  /* copy any -D switch values to all clients */
4165  for (i = 1; i < nclients; i++)
4166  {
4167  int j;
4168 
4169  state[i].id = i;
4170  for (j = 0; j < state[0].nvariables; j++)
4171  {
4172  Variable *var = &state[0].variables[j];
4173 
4174  if (var->is_numeric)
4175  {
4176  if (!putVariableNumber(&state[i], "startup",
4177  var->name, &var->num_value))
4178  exit(1);
4179  }
4180  else
4181  {
4182  if (!putVariable(&state[i], "startup",
4183  var->name, var->value))
4184  exit(1);
4185  }
4186  }
4187  }
4188  }
4189 
4190  if (debug)
4191  {
4192  if (duration <= 0)
4193  printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
4194  pghost, pgport, nclients, nxacts, dbName);
4195  else
4196  printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
4197  pghost, pgport, nclients, duration, dbName);
4198  }
4199 
4200  /* opening connection... */
4201  con = doConnect();
4202  if (con == NULL)
4203  exit(1);
4204 
4205  if (PQstatus(con) == CONNECTION_BAD)
4206  {
4207  fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
4208  fprintf(stderr, "%s", PQerrorMessage(con));
4209  exit(1);
4210  }
4211 
4212  if (internal_script_used)
4213  {
4214  /*
4215  * get the scaling factor that should be same as count(*) from
4216  * pgbench_branches if this is not a custom query
4217  */
4218  res = PQexec(con, "select count(*) from pgbench_branches");
4219  if (PQresultStatus(res) != PGRES_TUPLES_OK)
4220  {
4221  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
4222 
4223  fprintf(stderr, "%s", PQerrorMessage(con));
4224  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
4225  {
4226  fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
4227  }
4228 
4229  exit(1);
4230  }
4231  scale = atoi(PQgetvalue(res, 0, 0));
4232  if (scale < 0)
4233  {
4234  fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
4235  PQgetvalue(res, 0, 0));
4236  exit(1);
4237  }
4238  PQclear(res);
4239 
4240  /* warn if we override user-given -s switch */
4241  if (scale_given)
4242  fprintf(stderr,
4243  "scale option ignored, using count from pgbench_branches table (%d)\n",
4244  scale);
4245  }
4246 
4247  /*
4248  * :scale variables normally get -s or database scale, but don't override
4249  * an explicit -D switch
4250  */
4251  if (lookupVariable(&state[0], "scale") == NULL)
4252  {
4253  for (i = 0; i < nclients; i++)
4254  {
4255  if (!putVariableInt(&state[i], "startup", "scale", scale))
4256  exit(1);
4257  }
4258  }
4259 
4260  /*
4261  * Define a :client_id variable that is unique per connection. But don't
4262  * override an explicit -D switch.
4263  */
4264  if (lookupVariable(&state[0], "client_id") == NULL)
4265  {
4266  for (i = 0; i < nclients; i++)
4267  {
4268  if (!putVariableInt(&state[i], "startup", "client_id", i))
4269  exit(1);
4270  }
4271  }
4272 
4273  if (!is_no_vacuum)
4274  {
4275  fprintf(stderr, "starting vacuum...");
4276  tryExecuteStatement(con, "vacuum pgbench_branches");
4277  tryExecuteStatement(con, "vacuum pgbench_tellers");
4278  tryExecuteStatement(con, "truncate pgbench_history");
4279  fprintf(stderr, "end.\n");
4280 
4281  if (do_vacuum_accounts)
4282  {
4283  fprintf(stderr, "starting vacuum pgbench_accounts...");
4284  tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
4285  fprintf(stderr, "end.\n");
4286  }
4287  }
4288  PQfinish(con);
4289 
4290  /* set random seed */
4291  INSTR_TIME_SET_CURRENT(start_time);
4292  srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
4293 
4294  /* set up thread data structures */
4295  threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
4296  nclients_dealt = 0;
4297 
4298  for (i = 0; i < nthreads; i++)
4299  {
4300  TState *thread = &threads[i];
4301 
4302  thread->tid = i;
4303  thread->state = &state[nclients_dealt];
4304  thread->nstate =
4305  (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
4306  thread->random_state[0] = random();
4307  thread->random_state[1] = random();
4308  thread->random_state[2] = random();
4309  thread->logfile = NULL; /* filled in later */
4310  thread->latency_late = 0;
4311  initStats(&thread->stats, 0);
4312 
4313  nclients_dealt += thread->nstate;
4314  }
4315 
4316  /* all clients must be assigned to a thread */
4317  Assert(nclients_dealt == nclients);
4318 
4319  /* get start up time */
4320  INSTR_TIME_SET_CURRENT(start_time);
4321 
4322  /* set alarm if duration is specified. */
4323  if (duration > 0)
4324  setalarm(duration);
4325 
4326  /* start threads */
4327 #ifdef ENABLE_THREAD_SAFETY
4328  for (i = 0; i < nthreads; i++)
4329  {
4330  TState *thread = &threads[i];
4331 
4333 
4334  /* compute when to stop */
4335  if (duration > 0)
4336  end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
4337  (int64) 1000000 * duration;
4338 
4339  /* the first thread (i = 0) is executed by main thread */
4340  if (i > 0)
4341  {
4342  int err = pthread_create(&thread->thread, NULL, threadRun, thread);
4343 
4344  if (err != 0 || thread->thread == INVALID_THREAD)
4345  {
4346  fprintf(stderr, "could not create thread: %s\n", strerror(err));
4347  exit(1);
4348  }
4349  }
4350  else
4351  {
4352  thread->thread = INVALID_THREAD;
4353  }
4354  }
4355 #else
4356  INSTR_TIME_SET_CURRENT(threads[0].start_time);
4357  /* compute when to stop */
4358  if (duration > 0)
4359  end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
4360  (int64) 1000000 * duration;
4361  threads[0].thread = INVALID_THREAD;
4362 #endif /* ENABLE_THREAD_SAFETY */
4363 
4364  /* wait for threads and accumulate results */
4365  initStats(&stats, 0);
4366  INSTR_TIME_SET_ZERO(conn_total_time);
4367  for (i = 0; i < nthreads; i++)
4368  {
4369  TState *thread = &threads[i];
4370 
4371 #ifdef ENABLE_THREAD_SAFETY
4372  if (threads[i].thread == INVALID_THREAD)
4373  /* actually run this thread directly in the main thread */
4374  (void) threadRun(thread);
4375  else
4376  /* wait of other threads. should check that 0 is returned? */
4377  pthread_join(thread->thread, NULL);
4378 #else
4379  (void) threadRun(thread);
4380 #endif /* ENABLE_THREAD_SAFETY */
4381 
4382  /* aggregate thread level stats */
4383  mergeSimpleStats(&stats.latency, &thread->stats.latency);
4384  mergeSimpleStats(&stats.lag, &thread->stats.lag);
4385  stats.cnt += thread->stats.cnt;
4386  stats.skipped += thread->stats.skipped;
4387  latency_late += thread->latency_late;
4388  INSTR_TIME_ADD(conn_total_time, thread->conn_time);
4389  }
4390  disconnect_all(state, nclients);
4391 
4392  /*
4393  * XXX We compute results as though every client of every thread started
4394  * and finished at the same time. That model can diverge noticeably from
4395  * reality for a short benchmark run involving relatively many threads.
4396  * The first thread may process notably many transactions before the last
4397  * thread begins. Improving the model alone would bring limited benefit,
4398  * because performance during those periods of partial thread count can
4399  * easily exceed steady state performance. This is one of the many ways
4400  * short runs convey deceptive performance figures.
4401  */
4402  INSTR_TIME_SET_CURRENT(total_time);
4403  INSTR_TIME_SUBTRACT(total_time, start_time);
4404  printResults(threads, &stats, total_time, conn_total_time, latency_late);
4405 
4406  return 0;
4407 }
4408 
4409 static void *
4411 {
4412  TState *thread = (TState *) arg;
4413  CState *state = thread->state;
4414  instr_time start,
4415  end;
4416  int nstate = thread->nstate;
4417  int remains = nstate; /* number of remaining clients */
4418  int i;
4419 
4420  /* for reporting progress: */
4421  int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
4422  int64 last_report = thread_start;
4423  int64 next_report = last_report + (int64) progress * 1000000;
4424  StatsData last,
4425  aggs;
4426 
4427  /*
4428  * Initialize throttling rate target for all of the thread's clients. It
4429  * might be a little more accurate to reset thread->start_time here too.
4430  * The possible drift seems too small relative to typical throttle delay
4431  * times to worry about it.
4432  */
4433  INSTR_TIME_SET_CURRENT(start);
4434  thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
4435 
4436  INSTR_TIME_SET_ZERO(thread->conn_time);
4437 
4438  initStats(&aggs, time(NULL));
4439  last = aggs;
4440 
4441  /* open log file if requested */
4442  if (use_log)
4443  {
4444  char logpath[MAXPGPATH];
4445  char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
4446 
4447  if (thread->tid == 0)
4448  snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
4449  else
4450  snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
4451 
4452  thread->logfile = fopen(logpath, "w");
4453 
4454  if (thread->logfile == NULL)
4455  {
4456  fprintf(stderr, "could not open logfile \"%s\": %s\n",
4457  logpath, strerror(errno));
4458  goto done;
4459  }
4460  }
4461 
4462  if (!is_connect)
4463  {
4464  /* make connections to the database */
4465  for (i = 0; i < nstate; i++)
4466  {
4467  if ((state[i].con = doConnect()) == NULL)
4468  goto done;
4469  }
4470  }
4471 
4472  /* time after thread and connections set up */
4474  INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
4475 
4476  /* explicitly initialize the state machines */
4477  for (i = 0; i < nstate; i++)
4478  {
4479  state[i].state = CSTATE_CHOOSE_SCRIPT;
4480  }
4481 
4482  /* loop till all clients have terminated */
4483  while (remains > 0)
4484  {
4485  fd_set input_mask;
4486  int maxsock; /* max socket number to be waited for */
4487  int64 min_usec;
4488  int64 now_usec = 0; /* set this only if needed */
4489 
4490  /* identify which client sockets should be checked for input */
4491  FD_ZERO(&input_mask);
4492  maxsock = -1;
4493  min_usec = PG_INT64_MAX;
4494  for (i = 0; i < nstate; i++)
4495  {
4496  CState *st = &state[i];
4497 
4498  if (st->state == CSTATE_THROTTLE && timer_exceeded)
4499  {
4500  /* interrupt client that has not started a transaction */
4501  st->state = CSTATE_FINISHED;
4502  PQfinish(st->con);
4503  st->con = NULL;
4504  remains--;
4505  }
4506  else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
4507  {
4508  /* a nap from the script, or under throttling */
4509  int64 this_usec;
4510 
4511  /* get current time if needed */
4512  if (now_usec == 0)
4513  {
4514  instr_time now;
4515 
4517  now_usec = INSTR_TIME_GET_MICROSEC(now);
4518  }
4519 
4520  /* min_usec should be the minimum delay across all clients */
4521  this_usec = (st->state == CSTATE_SLEEP ?
4522  st->sleep_until : st->txn_scheduled) - now_usec;
4523  if (min_usec > this_usec)
4524  min_usec = this_usec;
4525  }
4526  else if (st->state == CSTATE_WAIT_RESULT)
4527  {
4528  /*
4529  * waiting for result from server - nothing to do unless the
4530  * socket is readable
4531  */
4532  int sock = PQsocket(st->con);
4533 
4534  if (sock < 0)
4535  {
4536  fprintf(stderr, "invalid socket: %s",
4537  PQerrorMessage(st->con));
4538  goto done;
4539  }
4540 
4541  FD_SET(sock, &input_mask);
4542  if (maxsock < sock)
4543  maxsock = sock;
4544  }
4545  else if (st->state != CSTATE_ABORTED &&
4546  st->state != CSTATE_FINISHED)
4547  {
4548  /*
4549  * This client thread is ready to do something, so we don't
4550  * want to wait. No need to examine additional clients.
4551  */
4552  min_usec = 0;
4553  break;
4554  }
4555  }
4556 
4557  /* also wake up to print the next progress report on time */
4558  if (progress && min_usec > 0 && thread->tid == 0)
4559  {
4560  /* get current time if needed */
4561  if (now_usec == 0)
4562  {
4563  instr_time now;
4564 
4566  now_usec = INSTR_TIME_GET_MICROSEC(now);
4567  }
4568 
4569  if (now_usec >= next_report)
4570  min_usec = 0;
4571  else if ((next_report - now_usec) < min_usec)
4572  min_usec = next_report - now_usec;
4573  }
4574 
4575  /*
4576  * If no clients are ready to execute actions, sleep until we receive
4577  * data from the server, or a nap-time specified in the script ends,
4578  * or it's time to print a progress report. Update input_mask to show
4579  * which client(s) received data.
4580  */
4581  if (min_usec > 0)
4582  {
4583  int nsocks = 0; /* return from select(2) if called */
4584 
4585  if (min_usec != PG_INT64_MAX)
4586  {
4587  if (maxsock != -1)
4588  {
4589  struct timeval timeout;
4590 
4591  timeout.tv_sec = min_usec / 1000000;
4592  timeout.tv_usec = min_usec % 1000000;
4593  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
4594  }
4595  else /* nothing active, simple sleep */
4596  {
4597  pg_usleep(min_usec);
4598  }
4599  }
4600  else /* no explicit delay, select without timeout */
4601  {
4602  nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
4603  }
4604 
4605  if (nsocks < 0)
4606  {
4607  if (errno == EINTR)
4608  {
4609  /* On EINTR, go back to top of loop */
4610  continue;
4611  }
4612  /* must be something wrong */
4613  fprintf(stderr, "select() failed: %s\n", strerror(errno));
4614  goto done;
4615  }
4616  }
4617  else /* min_usec == 0, i.e. something needs to be executed */
4618  {
4619  /* If we didn't call select(), don't try to read any data */
4620  FD_ZERO(&input_mask);
4621  }
4622 
4623  /* ok, advance the state machine of each connection */
4624  for (i = 0; i < nstate; i++)
4625  {
4626  CState *st = &state[i];
4627 
4628  if (st->state == CSTATE_WAIT_RESULT)
4629  {
4630  /* don't call doCustom unless data is available */
4631  int sock = PQsocket(st->con);
4632 
4633  if (sock < 0)
4634  {
4635  fprintf(stderr, "invalid socket: %s",
4636  PQerrorMessage(st->con));
4637  goto done;
4638  }
4639 
4640  if (!FD_ISSET(sock, &input_mask))
4641  continue;
4642  }
4643  else if (st->state == CSTATE_FINISHED ||
4644  st->state == CSTATE_ABORTED)
4645  {
4646  /* this client is done, no need to consider it anymore */
4647  continue;
4648  }
4649 
4650  doCustom(thread, st, &aggs);
4651 
4652  /* If doCustom changed client to finished state, reduce remains */
4653  if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
4654  remains--;
4655  }
4656 
4657  /* progress report is made by thread 0 for all threads */
4658  if (progress && thread->tid == 0)
4659  {
4660  instr_time now_time;
4661  int64 now;
4662 
4663  INSTR_TIME_SET_CURRENT(now_time);
4664  now = INSTR_TIME_GET_MICROSEC(now_time);
4665  if (now >= next_report)
4666  {
4667  /* generate and show report */
4668  StatsData cur;
4669  int64 run = now - last_report;
4670  double tps,
4671  total_run,
4672  latency,
4673  sqlat,
4674  lag,
4675  stdev;
4676  char tbuf[64];
4677 
4678  /*
4679  * Add up the statistics of all threads.
4680  *
4681  * XXX: No locking. There is no guarantee that we get an
4682  * atomic snapshot of the transaction count and latencies, so
4683  * these figures can well be off by a small amount. The
4684  * progress is report's purpose is to give a quick overview of
4685  * how the test is going, so that shouldn't matter too much.
4686  * (If a read from a 64-bit integer is not atomic, you might
4687  * get a "torn" read and completely bogus latencies though!)
4688  */
4689  initStats(&cur, 0);
4690  for (i = 0; i < nthreads; i++)
4691  {
4692  mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
4693  mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
4694  cur.cnt += thread[i].stats.cnt;
4695  cur.skipped += thread[i].stats.skipped;
4696  }
4697 
4698  total_run = (now - thread_start) / 1000000.0;
4699  tps = 1000000.0 * (cur.cnt - last.cnt) / run;
4700  latency = 0.001 * (cur.latency.sum - last.latency.sum) /
4701  (cur.cnt - last.cnt);
4702  sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
4703  / (cur.cnt - last.cnt);
4704  stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
4705  lag = 0.001 * (cur.lag.sum - last.lag.sum) /
4706  (cur.cnt - last.cnt);
4707 
4708  if (progress_timestamp)
4709  {
4710  /*
4711  * On some platforms the current system timestamp is
4712  * available in now_time, but rather than get entangled
4713  * with that, we just eat the cost of an extra syscall in
4714  * all cases.
4715  */
4716  struct timeval tv;
4717 
4718  gettimeofday(&tv, NULL);
4719  snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
4720  (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
4721  }
4722  else
4723  snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
4724 
4725  fprintf(stderr,
4726  "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
4727  tbuf, tps, latency, stdev);
4728 
4729  if (throttle_delay)
4730  {
4731  fprintf(stderr, ", lag %.3f ms", lag);
4732  if (latency_limit)
4733  fprintf(stderr, ", " INT64_FORMAT " skipped",
4734  cur.skipped - last.skipped);
4735  }
4736  fprintf(stderr, "\n");
4737 
4738  last = cur;
4739  last_report = now;
4740 
4741  /*
4742  * Ensure that the next report is in the future, in case
4743  * pgbench/postgres got stuck somewhere.
4744  */
4745  do
4746  {
4747  next_report += (int64) progress * 1000000;
4748  } while (now >= next_report);
4749  }
4750  }
4751  }
4752 
4753 done:
4754  INSTR_TIME_SET_CURRENT(start);
4755  disconnect_all(state, nstate);
4757  INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
4758  if (thread->logfile)
4759  {
4760  if (agg_interval > 0)
4761  {
4762  /* log aggregated but not yet reported transactions */
4763  doLog(thread, state, &aggs, false, 0, 0);
4764  }
4765  fclose(thread->logfile);
4766  thread->logfile = NULL;
4767  }
4768  return NULL;
4769 }
4770 
4771 /*
4772  * Support for duration option: set timer_exceeded after so many seconds.
4773  */
4774 
4775 #ifndef WIN32
4776 
4777 static void
4779 {
4780  timer_exceeded = true;
4781 }
4782 
4783 static void
4784 setalarm(int seconds)
4785 {
4787  alarm(seconds);
4788 }
4789 
4790 #else /* WIN32 */
4791 
4792 static VOID CALLBACK
4793 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
4794 {
4795  timer_exceeded = true;
4796 }
4797 
4798 static void
4799 setalarm(int seconds)
4800 {
4801  HANDLE queue;
4802  HANDLE timer;
4803 
4804  /* This function will be called at most once, so we can cheat a bit. */
4805  queue = CreateTimerQueue();
4806  if (seconds > ((DWORD) -1) / 1000 ||
4807  !CreateTimerQueueTimer(&timer, queue,
4808  win32_timer_callback, NULL, seconds * 1000, 0,
4809  WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
4810  {
4811  fprintf(stderr, "failed to set timer\n");
4812  exit(1);
4813  }
4814 }
4815 
4816 /* partial pthread implementation for Windows */
4817 
4818 typedef struct win32_pthread
4819 {
4820  HANDLE handle;
4821  void *(*routine) (void *);
4822  void *arg;
4823  void *result;
4824 } win32_pthread;
4825 
4826 static unsigned __stdcall
4827 win32_pthread_run(void *arg)
4828 {
4829  win32_pthread *th = (win32_pthread *) arg;
4830 
4831  th->result = th->routine(th->arg);
4832 
4833  return 0;
4834 }
4835 
4836 static int
4837 pthread_create(pthread_t *thread,
4838  pthread_attr_t *attr,
4839  void *(*start_routine) (void *),
4840  void *arg)
4841 {
4842  int save_errno;
4843  win32_pthread *th;
4844 
4845  th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
4846  th->routine = start_routine;
4847  th->arg = arg;
4848  th->result = NULL;
4849 
4850  th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
4851  if (th->handle == NULL)
4852  {
4853  save_errno = errno;
4854  free(th);
4855  return save_errno;
4856  }
4857 
4858  *thread = th;
4859  return 0;
4860 }
4861 
4862 static int
4863 pthread_join(pthread_t th, void **thread_return)
4864 {
4865  if (th == NULL || th->handle == NULL)
4866  return errno = EINVAL;
4867 
4868  if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
4869  {
4870  _dosmaperr(GetLastError());
4871  return errno;
4872  }
4873 
4874  if (thread_return)
4875  *thread_return = th->result;
4876 
4877  CloseHandle(th->handle);
4878  free(th);
4879  return 0;
4880 }
4881 
4882 #endif /* WIN32 */
time_t start_time
Definition: pgbench.c:231
static char password[100]
Definition: streamutil.c:45
int length(const List *list)
Definition: list.c:1271
PgBenchValue constant
Definition: pgbench.h:90
static const BuiltinScript * findBuiltin(const char *name)
Definition: pgbench.c:3403
static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *)
Definition: pgbench.c:1682
int64 throttle_trigger
Definition: pgbench.c:346
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:1941
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6097
int unlogged_tables
Definition: pgbench.c:122
PsqlScanResult
Definition: psqlscan.h:30
void expr_scanner_finish(yyscan_t yyscanner)
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:105
double sample_rate
Definition: pgbench.c:127
static Variable * lookupVariable(CState *st, char *name)
Definition: pgbench.c:939
static bool putVariableNumber(CState *st, const char *context, char *name, const PgBenchValue *value)
Definition: pgbench.c:1132
#define PG_INT64_MAX
Definition: c.h:328
int main(int argc, char **argv)
Definition: pgbench.c:3634
static int64 getrand(TState *thread, int64 min, int64 max)
Definition: pgbench.c:627
#define ERRCODE_UNDEFINED_TABLE
Definition: pgbench.c:61
int type
Definition: pgbench.c:380
#define INVALID_THREAD
Definition: pgbench.c:356
int command_num
Definition: pgbench.c:379
int expr_yyparse(yyscan_t yyscanner)
ConnectionStateEnum
Definition: pgbench.c:242
int id
Definition: pgbench.c:312
const char * name
Definition: pgbench.c:405
static void ParseScript(const char *script, const char *desc, int weight)
Definition: pgbench.c:3218
static int chooseScript(TState *thread)
Definition: pgbench.c:1849
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1234
static void discard_response(CState *state)
Definition: pgbench.c:917
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3118
static int num_scripts
Definition: pgbench.c:396
char * line
Definition: pgbench.c:378
unsigned short random_state[3]
Definition: pgbench.c:345
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void listAvailableScripts(void)
Definition: pgbench.c:3391
static Command * process_sql_command(PQExpBuffer buf, const char *source)
Definition: pgbench.c:2995
char * name
Definition: pgbench.c:201
int nclients
Definition: pgbench.c:174
const char * get_progname(const char *argv0)
Definition: path.c:453
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:128
PsqlScanState psql_scan_create(const PsqlScanCallbacks *callbacks)
struct BuiltinScript BuiltinScript
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:57
static int compareVariableNames(const void *v1, const void *v2)
Definition: pgbench.c:931
long random(void)
Definition: random.c:22
static void static void addScript(ParsedScript script)
Definition: pgbench.c:3484
static int parseScriptWeight(const char *option, char **script)
Definition: pgbench.c:3440
static const PsqlScanCallbacks pgbench_callbacks
Definition: pgbench.c:464
yyscan_t expr_scanner_init(PsqlScanState state, const char *source, int lineno, int start_offset, const char *command)
struct timeval instr_time
Definition: instr_time.h:147
#define Min(x, y)
Definition: c.h:812
#define COMMANDS_ALLOC_NUM
static void preparedStatementName(char *buffer, int file, int state)
Definition: pgbench.c:1834
void syntax_error(const char *source, int lineno, const char *line, const char *command, const char *msg, const char *more, int column)
Definition: pgbench.c:2960
struct PgBenchExpr::@40::@41 variable
union PgBenchValue::@39 u
int64 latency_late
Definition: pgbench.c:353
PgBenchExpr * expr
Definition: pgbench.c:383
SimpleStats lag
Definition: pgbench.c:236
struct ParsedScript ParsedScript
struct cursor * cur
Definition: ecpg.c:28
StatsData stats
Definition: pgbench.c:392
static void initStats(StatsData *sd, time_t start_time)
Definition: pgbench.c:784
double sum
Definition: pgbench.c:221
#define INSTR_TIME_ACCUM_DIFF(x, y, z)
Definition: instr_time.h:179
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3630
int weight
Definition: pgbench.c:390
int scale
Definition: pgbench.c:106
static void addToSimpleStats(SimpleStats *ss, double val)
Definition: pgbench.c:753
#define select(n, r, w, e, timeout)
Definition: win32.h:374
#define INSTR_TIME_SET_ZERO(t)
Definition: instr_time.h:151
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
bool use_log
Definition: pgbench.c:167
static char * getVariable(CState *st, char *name)
Definition: pgbench.c:966
static bool putVariable(CState *st, const char *context, char *name, const char *value)
Definition: pgbench.c:1109
static FILE * logfile
Definition: pg_regress.c:100
SimpleStats latency
Definition: pgbench.c:235
#define INSTR_TIME_GET_DOUBLE(t)
Definition: instr_time.h:196
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define lengthof(array)
Definition: c.h:556
static bool coerceToDouble(PgBenchValue *pval, double *dval)
Definition: pgbench.c:1283
#define MAX_SCRIPTS
Definition: pgbench.c:207
static void setDoubleValue(PgBenchValue *pv, double dval)
Definition: pgbench.c:1308
#define LOG_STEP_SECONDS
Definition: pgbench.c:93
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
#define MIN_GAUSSIAN_PARAM
Definition: pgbench.c:96
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define WSEP
Definition: pgbench.c:187
static void process_file(const char *filename, int weight)
Definition: pgbench.c:3350
bool per_script_stats
Definition: pgbench.c:171
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2647
char * argv[MAX_ARGS]
Definition: pgbench.c:382
bool use_quiet
Definition: pgbench.c:168
int duration
Definition: pgbench.c:99
static void commandFailed(CState *st, char *message)
Definition: pgbench.c:1840
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:529
static bool valid_variable_name(const char *name)
Definition: pgbench.c:1036
#define INSTR_TIME_IS_ZERO(t)
Definition: instr_time.h:149
static time_t start_time
Definition: pg_ctl.c:103
#define pg_attribute_printf(f, a)
Definition: c.h:627
#define MAX_ARGS
Definition: pgbench.c:363
static void executeStatement(PGconn *con, const char *sql)
Definition: pgbench.c:818
static bool makeVariableNumeric(Variable *var)
Definition: pgbench.c:995
#define SCALE_32BIT_THRESHOLD
Definition: pgbench.c:165
Definition: type.h:89
static int64 getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
Definition: pgbench.c:670
bool is_connect
Definition: pgbench.c:176
Variable * variables
Definition: pgbench.c:319
int PQputline(PGconn *conn, const char *s)
Definition: fe-exec.c:2540
int nstate
Definition: pgbench.c:344
static const char * QUERYMODE[]
Definition: pgbench.c:374
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1183
void pg_usleep(long microsec)
Definition: signal.c:53
#define MAX_FARGS
Definition: pgbench.c:1315
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:3525
int64 strtoint64(const char *str)
Definition: pgbench.c:564
#define SHELL_COMMAND_SIZE
Definition: pgbench.c:208
int64 end_time
Definition: pgbench.c:100
#define nbranches
Definition: pgbench.c:153
#define required_argument
Definition: getopt_long.h:25
FILE * logfile
Definition: pgbench.c:347
static void mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
Definition: pgbench.c:768
int foreign_keys
Definition: pgbench.c:117
int optind
Definition: getopt.c:51
#define IS_HIGHBIT_SET(ch)
Definition: c.h:979
instr_time stmt_begin
Definition: pgbench.c:327
int64 cnt
Definition: pgbench.c:232
CState * state
Definition: pgbench.c:343
int expr_scanner_get_lineno(PsqlScanState state, int offset)
static void printSimpleStats(char *prefix, SimpleStats *ss)
Definition: pgbench.c:3503
int fillfactor
Definition: pgbench.c:112
#define M_PI
Definition: pgbench.c:56
#define INSTR_TIME_SUBTRACT(x, y)
Definition: instr_time.h:167
#define META_COMMAND
Definition: pgbench.c:362
static bool evalFunc(TState *thread, CState *st, PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
Definition: pgbench.c:1321
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static int debug
Definition: pgbench.c:400
PGconn * conn
Definition: streamutil.c:46
bool vars_sorted
Definition: pgbench.c:321
#define MAXPGPATH
static struct @121 value
static void doCustom(TState *thread, CState *st, StatsData *agg)
Definition: pgbench.c:1987
static char * assignVariables(CState *st, char *sql)
Definition: pgbench.c:1213
char sign
Definition: informix.c:693
QueryMode
Definition: pgbench.c:365
char * c
int nxacts
Definition: pgbench.c:98
static char * buf
Definition: pg_test_fsync.c:67
#define INSTR_TIME_ADD(x, y)
Definition: instr_time.h:155
#define memmove(d, s, c)
Definition: c.h:1064
#define PG_INT64_MIN
Definition: c.h:327
static void disconnect_all(CState *state, int length)
Definition: pgbench.c:2590
const char * desc
Definition: pgbench.c:389
PgBenchExpr * expr_parse_result
bool is_numeric
Definition: pgbench.c:203
void simple_prompt(const char *prompt, char *destination, size_t destlen, bool echo)
Definition: sprompt.c:37
char * tablespace
Definition: pgbench.c:146
static void usage(void)
Definition: pgbench.c:471
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
int64 cnt
Definition: pgbench.c:332
#define PARAMS_ARRAY_SIZE
instr_time start_time
Definition: pgbench.c:350
char * index_tablespace
Definition: pgbench.c:147
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
int nthreads
Definition: pgbench.c:175
int64 txn_scheduled
Definition: pgbench.c:324
char * value
Definition: pgbench.c:202
int expr_scanner_offset(PsqlScanState state)
static ParsedScript sql_script[MAX_SCRIPTS]
Definition: pgbench.c:395
static bool sendCommand(CState *st, Command *command)
Definition: pgbench.c:1868
static Variable * lookupCreateVariable(CState *st, const char *context, char *name)
Definition: pgbench.c:1063
static bool is_an_int(const char *str)
Definition: pgbench.c:532
static char * replaceVariable(char **sql, char *param, int len, char *value)
Definition: pgbench.c:1193
int64 count
Definition: pgbench.c:218
static bool parseQuery(Command *cmd)
Definition: pgbench.c:2886
int64 skipped
Definition: pgbench.c:233
int ecnt
Definition: pgbench.c:333
static int64 total_weight
Definition: pgbench.c:398
char * pghost
Definition: pgbench.c:180
int progress
Definition: pgbench.c:172
static void doLog(TState *thread, CState *st, StatsData *agg, bool skipped, double latency, double lag)
Definition: pgbench.c:2462
#define CppAsString2(x)
Definition: c.h:162
#define no_argument
Definition: getopt_long.h:24
instr_time txn_begin
Definition: pgbench.c:326
static bool putVariableInt(CState *st, const char *context, char *name, int64 value)
Definition: pgbench.c:1153
#define MAX_PREPARE_NAME
Definition: pgbench.c:1832
#define ntellers
Definition: pgbench.c:155
typedef VOID(WINAPI *PgGetSystemTimeFn)(LPFILETIME)
int argc
Definition: pgbench.c:381
#define SQL_COMMAND
Definition: pgbench.c:361
double max
Definition: pgbench.c:220
static const BuiltinScript builtin_script[]
Definition: pgbench.c:410
#define EINTR
Definition: win32.h:285
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1682
int command
Definition: pgbench.c:316
static void * threadRun(void *arg)
Definition: pgbench.c:4410
static void accumStats(StatsData *stats, bool skipped, double lat, double lag)
Definition: pgbench.c:797
void psql_scan_destroy(PsqlScanState state)
SimpleStats stats
Definition: pgbench.c:384
#define MAXCLIENTS
Definition: pgbench.c:90
#define naccounts
Definition: pgbench.c:156
double pg_erand48(unsigned short xseed[3])
Definition: erand48.c:79
ConnectionStateEnum state
Definition: pgbench.c:313
const char * desc
Definition: pgbench.c:406
static bool runShellCommand(CState *st, char *variable, char **argv, int argc)
Definition: pgbench.c:1729
double dval
Definition: pgbench.h:47
static void getQueryParams(CState *st, const Command *command, const char **params)
Definition: pgbench.c:1249
static int64 getPoissonRand(TState *thread, int64 center)
Definition: pgbench.c:726
void PQclear(PGresult *res)
Definition: fe-exec.c:671
int64 sleep_until
Definition: pgbench.c:325
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define free(a)
Definition: header.h:65
static void printResults(TState *threads, StatsData *total, instr_time total_time, instr_time conn_total_time, int latency_late)
Definition: pgbench.c:3515
bool progress_timestamp
Definition: pgbench.c:173
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static bool evaluateSleep(CState *st, int argc, char **argv, int *usecs)
Definition: pgbench.c:1951
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:5965
static bool have_password
Definition: streamutil.c:44
#define Max(x, y)
Definition: c.h:806
int PQendcopy(PGconn *conn)
Definition: fe-exec.c:2573
PGconn * con
Definition: pgbench.c:311
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2709
#define SIGNAL_ARGS
Definition: c.h:1085
void * yyscan_t
Definition: psqlscan_int.h:60
static PGconn * doConnect(void)
Definition: pgbench.c:848
#define Assert(condition)
Definition: c.h:681
void _dosmaperr(unsigned long)
Definition: win32error.c:171
double sum2
Definition: pgbench.c:222
Definition: regguts.h:298
const char * progname
Definition: pgbench.c:185
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1732
char * logfile_prefix
Definition: pgbench.c:184
#define INSTR_TIME_GET_MICROSEC(t)
Definition: instr_time.h:202
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
static void setIntValue(PgBenchValue *pv, int64 ival)
Definition: pgbench.c:1300
char * expr_scanner_get_substring(PsqlScanState state, int start_offset, int end_offset, bool chomp)
enum _promptStatus promptStatus_t
static char * read_file_contents(FILE *fd)
Definition: pgbench.c:3317
volatile bool timer_exceeded
Definition: pgbench.c:189
Command ** commands
Definition: pgbench.c:391
struct SimpleStats SimpleStats
static void init(bool is_no_vacuum)
Definition: pgbench.c:2606
static QueryMode querymode
Definition: pgbench.c:373
void pg_free(void *ptr)
Definition: fe_memutils.c:105
static char * parseVariable(const char *sql, int *eaten)
Definition: pgbench.c:1170
int64 latency_limit
Definition: pgbench.c:141
#define INSTR_TIME_SET_CURRENT(t)
Definition: instr_time.h:153
#define INT64_FORMAT
Definition: c.h:300
const char * name
Definition: encode.c:521
static void pgbench_error(const char *fmt,...) pg_attribute_printf(1
Definition: pgbench.c:2937
PsqlScanResult psql_scan(PsqlScanState state, PQExpBuffer query_buf, promptStatus_t *prompt)
pthread_t thread
Definition: pgbench.c:342
const char * script
Definition: pgbench.c:407
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1376
static Command * process_backslash_command(PsqlScanState sstate, const char *source)
Definition: pgbench.c:3058
PgBenchFunction function
Definition: pgbench.h:97
int main_pid
Definition: pgbench.c:178
int nvariables
Definition: pgbench.c:320
static core_yyscan_t yyscanner
Definition: pl_scanner.c:210
static Datum values[MAXATTR]
Definition: bootstrap.c:164
static void initSimpleStats(SimpleStats *ss)
Definition: pgbench.c:744
static char * filename
Definition: pg_dumpall.c:90
int PQconnectionNeedsPassword(const PGconn *conn)
Definition: fe-connect.c:6131
char * dbName
Definition: pgbench.c:183
int64 throttle_delay
Definition: pgbench.c:133
char * optarg
Definition: getopt.c:53
static void handle_sig_alarm(SIGNAL_ARGS)
Definition: pgbench.c:4778
PgBenchValueType type
Definition: pgbench.h:43
static int64 getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
Definition: pgbench.c:647
StatsData stats
Definition: pgbench.c:352
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static void process_builtin(const BuiltinScript *bi, int weight)
Definition: pgbench.c:3384
instr_time conn_time
Definition: pgbench.c:351
static bool coerceToInt(PgBenchValue *pval, int64 *ival)
Definition: pgbench.c:1259
void psql_scan_setup(PsqlScanState state, const char *line, int line_len, int encoding, bool std_strings)
bool prepared[MAX_SCRIPTS]
Definition: pgbench.c:329
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1897
static void processXactStats(TState *thread, CState *st, instr_time *now, bool skipped, StatsData *agg)
Definition: pgbench.c:2543
void srandom(unsigned int seed)
Definition: srandom.c:22
void * arg
int tid
Definition: pgbench.c:341
void psql_scan_finish(PsqlScanState state)
static void tryExecuteStatement(PGconn *con, const char *sql)
Definition: pgbench.c:833
static int num_commands
Definition: pgbench.c:397
#define DEFAULT_NXACTS
Definition: pgbench.c:94
#define pthread_t
Definition: pgbench.c:79
#define qsort(a, b, c, d)
Definition: port.h:447
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:145
PgBenchFunction
Definition: pgbench.h:61
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6044
static void setalarm(int seconds)
Definition: pgbench.c:4784
int64 ival
Definition: pgbench.h:46
bool expr_lex_one_word(PsqlScanState state, PQExpBuffer word_buf, int *offset)
struct StatsData StatsData
int use_file
Definition: pgbench.c:315
#define SIGALRM
Definition: win32.h:194
#define _(x)
Definition: elog.c:84
void PQfreemem(void *ptr)
Definition: fe-exec.c:3251
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6115
char * pgport
Definition: pgbench.c:181
union PgBenchExpr::@40 u
long val
Definition: informix.c:689
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1753
bool is_latencies
Definition: pgbench.c:177
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:89
double min
Definition: pgbench.c:219
char * login
Definition: pgbench.c:182
int agg_interval
Definition: pgbench.c:169
PgBenchValue num_value
Definition: pgbench.c:204
PgBenchExprType etype
Definition: pgbench.h:87